Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃帀 Source Amplitude: enable event stream time interval selection #21022

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.name=airbyte/source-amplitude
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
class AmplitudeStream(HttpStream, ABC):
api_version = 2

def __init__(self, data_region: str, **kwargs):
def __init__(self, data_region: str, event_time_interval: dict = None, **kwargs):
if event_time_interval is None:
event_time_interval = {"size_unit": "days", "size": 1}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evantahler default value is set here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

馃憤 lets try the test again!

self.data_region = data_region
self.event_time_interval = event_time_interval
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -166,7 +169,12 @@ class Events(IncrementalAmplitudeStream):
compare_date_template = "%Y-%m-%d %H:%M:%S.%f"
primary_key = "uuid"
state_checkpoint_interval = 1000
time_interval = {"days": 1}

@property
def time_interval(self) -> dict:
return {
self.event_time_interval.get('size_unit'): self.event_time_interval.get("size")
}
evantahler marked this conversation as resolved.
Show resolved Hide resolved

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
state_value = stream_state[self.cursor_field] if stream_state else self._start_date.strftime(self.compare_date_template)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [
Cohorts(authenticator=auth, data_region=config["data_region"]),
Annotations(authenticator=auth, data_region=config["data_region"]),
Events(authenticator=auth, start_date=config["start_date"], data_region=config["data_region"]),
Events(
authenticator=auth,
start_date=config["start_date"],
data_region=config["data_region"],
event_time_interval=config.get("event_time_interval", None)
),
ActiveUsers(authenticator=auth, start_date=config["start_date"], data_region=config["data_region"]),
AverageSessionLength(authenticator=auth, start_date=config["start_date"], data_region=config["data_region"]),
]
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,33 @@
"title": "Replication Start Date",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"description": "UTC date and time in the format 2021-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2021-01-25T00:00:00Z"],
"format": "date-time"
"examples": ["2021-01-25T00:00:00Z"]
},
"event_time_interval": {
"type": "object",
"title": "Event Stream Time Interval",
"description": "Amplitude event stream time interval",
"required": ["size_unit", "size"],
"properties": {
"size_unit" : {
"type" : "string",
"title" : "Events Time Interval Size Unit",
"description" : "Amplitude event stream's interval size unit",
"enum" : [
"days",
"hours",
"weeks",
"months"
],
"default" : "days"
},
"size" : {
"type" : "integer",
"title" : "Events Time Interval Size",
"description" : "Amplitude event stream's interval size unit",
"default" : 1
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,33 @@ def test_next_page_token(self, requests_mock, stream_cls, expected):
def test_get_end_date(self, stream_cls, expected):
now = pendulum.now()
yesterday = pendulum.yesterday()
stream = stream_cls(yesterday.isoformat(), data_region="Standard Server")
stream = stream_cls(
yesterday.isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
# update expected with test values.
expected = now.strftime(stream.date_template)
assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected


class TestEventsStream:
def test_parse_zip(self):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
stream = Events(
pendulum.now().isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
expected = [{"id": 123}]
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
assert expected == result

def test_stream_slices(self):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
stream = Events(
pendulum.now().isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
now = pendulum.now()
expected = [
{
Expand All @@ -207,20 +219,32 @@ def test_stream_slices(self):
assert expected == stream.stream_slices()

def test_request_params(self):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
stream = Events(
pendulum.now().isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
now = pendulum.now().subtract(hours=6)
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
assert slice == stream.request_params(slice)

def test_get_updated_state(self):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
stream = Events(
pendulum.now().isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
current_state = {"event_time": ""}
latest_record = {"event_time": "2021-05-27 11:59:53.710000"}
result = stream.get_updated_state(current_state, latest_record)
assert result == latest_record

def test_get_date_time_items_from_schema(self):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
stream = Events(
pendulum.now().isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
expected = [
"server_received_time",
"event_time",
Expand All @@ -244,6 +268,10 @@ def test_get_date_time_items_from_schema(self):
ids=["empty_record", "transformed_record", "null_value", "empty_value"],
)
def test_date_time_to_rfc3339(self, record, expected):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
stream = Events(
pendulum.now().isoformat(),
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
result = stream._date_time_to_rfc3339(record)
assert result == expected
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ def __init__(self, status_code):


def test_incremental_http_error_handler(mocker):
stream = Events(start_date="2021-01-01T00:00:00Z", data_region="Standard Server")
stream = Events(
start_date="2021-01-01T00:00:00Z",
data_region="Standard Server",
event_time_interval={"size_unit": "days", "size": 1}
)
stream_slice = stream.stream_slices()[0]

mock_response = MockRequest(404)
Expand Down
7 changes: 4 additions & 3 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ The Amplitude connector ideally should gracefully handle Amplitude API limitatio

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------|
| 0.1.23 | 2023-03-02 | [23087](https://github.com/airbytehq/airbyte/pull/23087) | Specified date formatting in specification |
| 0.1.22 | 2023-02-17 | [23192](https://github.com/airbytehq/airbyte/pull/23192) | Skip the stream if `start_date` is specified in the future.
| 0.1.21 | 2023-02-01 | [21888](https://github.com/airbytehq/airbyte/pull/21888) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.24 | 2023-03-28 | [21022](https://github.com/airbytehq/airbyte/pull/21022) | Enable event stream time interval selection |
| 0.1.23 | 2023-03-02 | [23087](https://github.com/airbytehq/airbyte/pull/23087) | Specified date formatting in specification |
| 0.1.22 | 2023-02-17 | [23192](https://github.com/airbytehq/airbyte/pull/23192) | Skip the stream if `start_date` is specified in the future.
| 0.1.21 | 2023-02-01 | [21888](https://github.com/airbytehq/airbyte/pull/21888) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
Expand Down