Skip to content

Commit

Permalink
✨Source Klaviyo: make start_date optional (#31710)
Browse files Browse the repository at this point in the history
  • Loading branch information
askarpets committed Oct 24, 2023
1 parent ff2fcf8 commit d53e542
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
dockerImageTag: 1.0.0
dockerImageTag: 1.1.0
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#

import re
from http import HTTPStatus
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from requests.exceptions import HTTPError
from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles


Expand All @@ -21,6 +23,12 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
try:
# we use metrics endpoint because it never returns an error
_ = list(Metrics(api_key=config["api_key"]).read_records(sync_mode=SyncMode.full_refresh))
except HTTPError as e:
if e.response.status_code in (HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED):
message = "Please provide a valid API key and make sure it has permissions to read specified streams."
else:
message = "Unable to connect to Klaviyo API with provided credentials."
return False, message
except Exception as e:
original_error_message = repr(e)

Expand All @@ -39,7 +47,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
api_key = config["api_key"]
start_date = config["start_date"]
start_date = config.get("start_date")
return [
Campaigns(api_key=api_key),
Events(api_key=api_key, start_date=start_date),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
"title": "Api Key",
"description": "Klaviyo API Key. See our <a href=\"https://docs.airbyte.com/integrations/sources/klaviyo\">docs</a> if you need help finding this key.",
"airbyte_secret": true,
"type": "string"
"type": "string",
"order": 0
},
"start_date": {
"title": "Start Date",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field is optional - if not provided, all data will be replicated.",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2017-01-25T00:00:00Z"],
"type": "string",
"format": "date-time"
"format": "date-time",
"order": 1
}
},
"required": ["api_key", "start_date"]
"required": ["api_key"]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def map_record(self, record: Mapping):
class IncrementalKlaviyoStreamLatest(KlaviyoStreamLatest, ABC):
"""Base class for all incremental streams, requires cursor_field to be declared"""

def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self._start_ts = start_date

Expand All @@ -103,11 +103,13 @@ def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)

if not params.get("filter"):
latest_cursor = pendulum.parse(self._start_ts)
stream_state_cursor_value = stream_state.get(self.cursor_field)
if stream_state_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(stream_state[self.cursor_field]))
params["filter"] = "greater-than(" + self.cursor_field + "," + latest_cursor.isoformat() + ")"
latest_cursor = self._start_ts or stream_state_cursor_value
if latest_cursor:
latest_cursor = pendulum.parse(latest_cursor)
if stream_state_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(stream_state_cursor_value))
params["filter"] = f"greater-than({self.cursor_field},{latest_cursor.isoformat()})"
params["sort"] = self.cursor_field
return params

Expand All @@ -118,8 +120,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
Required for incremental.
"""
current_stream_cursor_value = current_stream_state.get(self.cursor_field, self._start_ts)
latest_record_cursor_value = latest_record[self.cursor_field]
latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value))
latest_cursor = pendulum.parse(latest_record[self.cursor_field])
if current_stream_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(current_stream_cursor_value))
return {self.cursor_field: latest_cursor.isoformat()}


Expand Down Expand Up @@ -193,9 +196,9 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
class IncrementalKlaviyoStreamV1(KlaviyoStreamV1, ABC):
"""Base class for all incremental streams, requires cursor_field to be declared"""

def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self._start_ts = int(pendulum.parse(start_date).timestamp())
self._start_ts = int(pendulum.parse(start_date).timestamp()) if start_date else 0
self._start_sync = int(pendulum.now().timestamp())

@property
Expand Down Expand Up @@ -254,9 +257,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
class ReverseIncrementalKlaviyoStreamV1(KlaviyoStreamV1, ABC):
"""Base class for all streams that natively incremental but supports desc & asc order"""

def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self._start_datetime = pendulum.parse(start_date)
self._start_datetime = pendulum.parse(start_date) if start_date else None
self._reversed = False
self._reached_old_records = False
self._low_boundary = None
Expand All @@ -280,7 +283,9 @@ def request_params(self, stream_state=None, **kwargs):
stream_state = stream_state or {}
if stream_state:
self._reversed = True
self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime)
self._low_boundary = pendulum.parse(stream_state[self.cursor_field])
if self._start_datetime:
self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime)
params = super().request_params(stream_state=stream_state, **kwargs)
params["sort"] = "desc" if self._reversed else "asc"

Expand Down Expand Up @@ -317,13 +322,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
""":return an iterable containing each record in the response"""

for record in super().parse_response(response=response, **kwargs):
if self._reversed:
if pendulum.parse(record[self.cursor_field]) < self._low_boundary:
self._reached_old_records = True
continue
else:
if pendulum.parse(record[self.cursor_field]) < self._start_datetime:
continue
if self._reversed and pendulum.parse(record[self.cursor_field]) < self._low_boundary:
self._reached_old_records = True
continue
elif self._start_datetime and pendulum.parse(record[self.cursor_field]) < self._start_datetime:
continue
yield record


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,18 @@ def test_availability_strategy(self):

class TestIncrementalKlaviyoStreamLatest:
api_key = "some_key"
start_date = START_DATE.isoformat()

def test_cursor_field_is_required(self):
with pytest.raises(
TypeError, match="Can't instantiate abstract class IncrementalKlaviyoStreamLatest with abstract methods cursor_field, path"
):
IncrementalKlaviyoStreamLatest(api_key=self.api_key, start_date=self.start_date)
IncrementalKlaviyoStreamLatest(api_key=self.api_key, start_date=START_DATE.isoformat())

@pytest.mark.parametrize(
("stream_state_date", "next_page_token", "expected_params"),
("config_start_date", "stream_state_date", "next_page_token", "expected_params"),
(
(
START_DATE.isoformat(),
{"updated": "2023-01-01T00:00:00+00:00"},
{"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"},
{
Expand All @@ -134,6 +134,7 @@ def test_cursor_field_is_required(self):
},
),
(
START_DATE.isoformat(),
None,
{"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"},
{
Expand All @@ -143,28 +144,52 @@ def test_cursor_field_is_required(self):
},
),
(
START_DATE.isoformat(),
None,
{"filter": "some_filter"},
{"filter": "some_filter"},
),
(
None,
None,
{"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"},
{
"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa",
"sort": "updated",
},
),
(
None,
{"updated": "2023-01-01T00:00:00+00:00"},
{"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"},
{
"filter": "greater-than(updated,2023-01-01T00:00:00+00:00)",
"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa",
"sort": "updated",
},
),
),
)
def test_request_params(self, stream_state_date, next_page_token, expected_params):
stream = SomeIncrementalStream(api_key=self.api_key, start_date=self.start_date)
def test_request_params(self, config_start_date, stream_state_date, next_page_token, expected_params):
stream = SomeIncrementalStream(api_key=self.api_key, start_date=config_start_date)
inputs = {"stream_state": stream_state_date, "next_page_token": next_page_token}
assert stream.request_params(**inputs) == expected_params

@pytest.mark.parametrize(
("current_cursor", "latest_cursor", "expected_cursor"),
("config_start_date", "current_cursor", "latest_cursor", "expected_cursor"),
(
("2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00", "2023-01-02T00:00:00+00:00"),
("2023-01-02T00:00:00+00:00", "2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00"),
(START_DATE.isoformat(), "2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00", "2023-01-02T00:00:00+00:00"),
(START_DATE.isoformat(), "2023-01-02T00:00:00+00:00", "2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00"),
(START_DATE.isoformat(), None, "2019-01-01T00:00:00+00:00", "2020-10-10T00:00:00+00:00"),
(None, "2020-10-10T00:00:00+00:00", "2019-01-01T00:00:00+00:00", "2020-10-10T00:00:00+00:00"),
(None, None, "2019-01-01T00:00:00+00:00", "2019-01-01T00:00:00+00:00"),
),
)
def test_get_updated_state(self, current_cursor, latest_cursor, expected_cursor):
stream = SomeIncrementalStream(api_key=self.api_key, start_date=self.start_date)
def test_get_updated_state(self, config_start_date, current_cursor, latest_cursor, expected_cursor):
stream = SomeIncrementalStream(api_key=self.api_key, start_date=config_start_date)
inputs = {
"current_stream_state": {stream.cursor_field: current_cursor},
# {"key": "value"} is needed to mimic the case when current_stream_state doesn't have cursor key
"current_stream_state": {stream.cursor_field: current_cursor} if current_cursor else {"key": "value"},
"latest_record": {stream.cursor_field: latest_cursor},
}
assert stream.get_updated_state(**inputs) == {stream.cursor_field: expected_cursor}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
400,
"Bad request",
False,
"HTTPError('400 Client Error: None for url: https://a.klaviyo.com/api/v1/metrics?api_key=***&count=100')",
"Unable to connect to Klaviyo API with provided credentials.",
),
(
403,
"Forbidden",
False,
"HTTPError('403 Client Error: None for url: https://a.klaviyo.com/api/v1/metrics?api_key=***&count=100')",
"Please provide a valid API key and make sure it has permissions to read specified streams.",
),
),
)
Expand All @@ -38,6 +38,18 @@ def test_check_connection(requests_mock, status_code, response, is_connection_su
assert error == error_msg


def test_check_connection_unexpected_error(requests_mock):
requests_mock.register_uri(
"GET",
"https://a.klaviyo.com/api/v1/metrics?api_key=api_key&count=100",
exc=Exception("Something went wrong, api_key=some_api_key"),
)
source = SourceKlaviyo()
success, error = source.check_connection(logger=None, config={"api_key": "api_key"})
assert success is False
assert error == "Exception('Something went wrong, api_key=***')"


def test_streams():
source = SourceKlaviyo()
config = {"api_key": "some_key", "start_date": pendulum.datetime(2020, 10, 10).isoformat()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,40 +92,56 @@ def test_cursor_field_is_required(self):
IncrementalKlaviyoStreamV1(api_key="some_key", start_date=START_DATE.isoformat())

@pytest.mark.parametrize(
["next_page_token", "stream_state", "expected_params"],
["config_start_date", "next_page_token", "stream_state", "expected_params"],
[
# start with start_date
(None, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp}),
(START_DATE.isoformat(), None, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp}),
# pagination overrule
({"since": 123}, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": 123}),
(START_DATE.isoformat(), {"since": 123}, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": 123}),
# start_date overrule state if state < start_date
(
START_DATE.isoformat(),
None,
{"updated_at": START_DATE.int_timestamp - 1},
{"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp},
),
# but pagination still overrule
(
START_DATE.isoformat(),
{"since": 123},
{"updated_at": START_DATE.int_timestamp - 1},
{"api_key": "some_key", "count": 100, "sort": "asc", "since": 123},
),
# and again
(
START_DATE.isoformat(),
{"since": 123},
{"updated_at": START_DATE.int_timestamp + 1},
{"api_key": "some_key", "count": 100, "sort": "asc", "since": 123},
),
# finally state > start_date and can be used
(
START_DATE.isoformat(),
None,
{"updated_at": START_DATE.int_timestamp + 1},
{"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp + 1},
),
(
None,
None,
{"updated_at": START_DATE.int_timestamp + 1},
{"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp + 1},
),
(
None,
None,
None,
{"api_key": "some_key", "count": 100, "sort": "asc", "since": 0},
),
],
)
def test_request_params(self, next_page_token, stream_state, expected_params):
stream = SomeIncrementalStream(api_key="some_key", start_date=START_DATE.isoformat())
def test_request_params(self, config_start_date, next_page_token, stream_state, expected_params):
stream = SomeIncrementalStream(api_key="some_key", start_date=config_start_date)
result = stream.request_params(stream_state=stream_state, next_page_token=next_page_token)

assert result == expected_params
Expand Down

0 comments on commit d53e542

Please sign in to comment.