Skip to content

Commit

Permalink
🚀 Source Klaviyo: New Stream addition along with update to existing (#…
Browse files Browse the repository at this point in the history
…11685)

* 🚀 flow stream added along with flow, campaign and flow message addition to event

* ⚡ liniting fix

* 🔨 annotations updated along with update method for klaviyo

* 💥 docker version updated and log added

* 🔨 fixed acceptance test for klaviyo flow stream

* 🔨 unused import removed

* fix: flows stream has no records thus tests are failing

* chore: update seed file

Co-authored-by: Harshith Mullapudi <harshithmullapudi@gmail.com>
  • Loading branch information
ahmed-buksh and harshithmullapudi committed May 31, 2022
1 parent fc73df4 commit c55f185
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@
- name: Klaviyo
sourceDefinitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
dockerRepository: airbyte/source-klaviyo
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/klaviyo
icon: klaviyo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4282,7 +4282,7 @@
supported_destination_sync_modes: []
supported_source_sync_modes:
- "append"
- dockerImage: "airbyte/source-klaviyo:0.1.3"
- dockerImage: "airbyte/source-klaviyo:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/klaviyo"
changelogUrl: "https://docs.airbyte.io/integrations/sources/klaviyo"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-klaviyo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-klaviyo
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
empty_streams: ['flows']
incremental:
- config_path: "secrets/config.json"
future_state_path: "integration_tests/abnormal_state.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
},
"global_exclusions": {
"timestamp": "2120-10-10T00:00:00Z"
},
"flows": {
"created": "2120-10-10 00:00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "flows",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ class Event(BaseSchemaModel):
statistic_id: str
event_properties: dict
person: dict
flow_id: Optional[str]
campaign_id: Optional[str]
flow_message_id: Optional[str]


class Flow(BaseSchemaModel):
id: str
name: str
status: str
created: datetime
updated: datetime
customer_filter: Optional[dict]
trigger: dict


class GlobalExclusion(BaseSchemaModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from airbyte_cdk.sources.streams import Stream
from pydantic import Field
from pydantic.main import BaseModel
from source_klaviyo.streams import Campaigns, Events, GlobalExclusions, Lists, Metrics
from source_klaviyo.streams import Campaigns, Events, Flows, GlobalExclusions, Lists, Metrics


class ConnectorConfig(BaseModel):
Expand Down Expand Up @@ -61,6 +61,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
GlobalExclusions(api_key=config.api_key, start_date=config.start_date),
Lists(api_key=config.api_key),
Metrics(api_key=config.api_key),
Flows(api_key=config.api_key, start_date=config.start_date),
]

def spec(self, *args, **kwargs) -> ConnectorSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
import requests
from airbyte_cdk.sources.streams.http import HttpStream
from source_klaviyo.schemas import Campaign, Event, GlobalExclusion, Metric, PersonList
from source_klaviyo.schemas import Campaign, Event, Flow, GlobalExclusion, Metric, PersonList


class KlaviyoStream(HttpStream, ABC):
Expand Down Expand Up @@ -99,7 +100,13 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
"""
state_ts = int(current_stream_state.get(self.cursor_field, 0))
return {self.cursor_field: max(latest_record.get(self.cursor_field), state_ts)}
latest_record = latest_record.get(self.cursor_field)

if isinstance(latest_record, str):
latest_record = datetime.datetime.strptime(latest_record, "%Y-%m-%d %H:%M:%S")
latest_record = datetime.datetime.timestamp(latest_record)

return {self.cursor_field: max(latest_record, state_ts)}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Expand Down Expand Up @@ -240,3 +247,26 @@ class Events(IncrementalKlaviyoStream):

def path(self, **kwargs) -> str:
return "metrics/timeline"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
:return an iterable containing each record in the response
"""
response_json = response.json()
for record in response_json.get("data", []):
flow = record["event_properties"].get("$flow")
flow_message_id = record["event_properties"].get("$message")

record["flow_id"] = flow
record["flow_message_id"] = flow_message_id
record["campaign_id"] = flow_message_id if not flow else None

yield record


class Flows(ReverseIncrementalKlaviyoStream):
schema = Flow
cursor_field = "created"

def path(self, **kwargs) -> str:
return "flows"
1 change: 1 addition & 0 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ Please follow these [steps](https://help.klaviyo.com/hc/en-us/articles/115005062

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. |
| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. |
| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |

0 comments on commit c55f185

Please sign in to comment.