Skip to content

Commit

Permalink
feat[source-klaviyo]: added stream events detailed (#39350)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Jun 10, 2024
1 parent 9410f42 commit cc74a08
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ acceptance_tests:
basic_read:
tests:
- config_path: secrets/config.json
timeout_seconds: 7200
expect_records:
path: integration_tests/expected_records.jsonl
connection:
Expand All @@ -18,14 +19,16 @@ acceptance_tests:
full_refresh:
tests:
- config_path: secrets/config.json
- configured_catalog_path: integration_tests/configured_catalog.json
configured_catalog_path: integration_tests/configured_catalog.json
timeout_seconds: 7200
incremental:
tests:
- config_path: secrets/config.json
configured_catalog_path: integration_tests/configured_catalog.json
future_state:
future_state_path: integration_tests/abnormal_state.json
skip_comprehensive_incremental_tests: true
timeout_seconds: 7200
spec:
tests:
- spec_path: source_klaviyo/spec.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,5 @@
{"stream": "flows", "data": {"type": "flow", "id": "U5LCpF", "attributes": {"name": "Welcome Series - Customer v. Non-Customer", "status": "live", "archived": false, "created": "2022-05-31T07:00:28+00:00", "updated": "2022-05-31T07:00:29+00:00", "trigger_type": "Added to List"}, "relationships": {"flow-actions": {"links": {"self": "https://a.klaviyo.com/api/flows/U5LCpF/relationships/flow-actions/", "related": "https://a.klaviyo.com/api/flows/U5LCpF/flow-actions/"}}, "tags": {"links": {"self": "https://a.klaviyo.com/api/flows/U5LCpF/relationships/tags/", "related": "https://a.klaviyo.com/api/flows/U5LCpF/tags/"}}}, "links": {"self": "https://a.klaviyo.com/api/flows/U5LCpF/"}, "updated": "2022-05-31T07:00:29+00:00"}, "emitted_at": 1698938560375}
{"stream": "flows", "data": {"type": "flow", "id": "VueJfU", "attributes": {"name": "Happy Birthday Email - Standard", "status": "manual", "archived": false, "created": "2022-05-31T07:01:40+00:00", "updated": "2022-05-31T07:01:41+00:00", "trigger_type": "Date Based"}, "relationships": {"flow-actions": {"links": {"self": "https://a.klaviyo.com/api/flows/VueJfU/relationships/flow-actions/", "related": "https://a.klaviyo.com/api/flows/VueJfU/flow-actions/"}}, "tags": {"links": {"self": "https://a.klaviyo.com/api/flows/VueJfU/relationships/tags/", "related": "https://a.klaviyo.com/api/flows/VueJfU/tags/"}}}, "links": {"self": "https://a.klaviyo.com/api/flows/VueJfU/"}, "updated": "2022-05-31T07:01:41+00:00"}, "emitted_at": 1698938560376}
{"stream": "flows", "data": {"type": "flow", "id": "ShbZ4B", "attributes": {"name": "Abandoned Cart Archived", "status": "draft", "archived": true, "created": "2023-10-31T11:37:03+00:00", "updated": "2023-10-31T11:40:31+00:00", "trigger_type": "Metric"}, "relationships": {"flow-actions": {"links": {"self": "https://a.klaviyo.com/api/flows/ShbZ4B/relationships/flow-actions/", "related": "https://a.klaviyo.com/api/flows/ShbZ4B/flow-actions/"}}, "tags": {"links": {"self": "https://a.klaviyo.com/api/flows/ShbZ4B/relationships/tags/", "related": "https://a.klaviyo.com/api/flows/ShbZ4B/tags/"}}}, "links": {"self": "https://a.klaviyo.com/api/flows/ShbZ4B/"}, "updated": "2023-10-31T11:40:31+00:00"}, "emitted_at": 1698938560690}
{"stream": "events_detailed", "data": {"type": "event", "id": "57tBiWaqFjP", "attributes": {"timestamp": 1711992795, "event_properties": {"Subject": "Campaign for segment", "Email Domain": "airbyte.io", "$message": "01HTDA5JFKGMQ7E0R9X2CTCH7N", "Campaign Name": "Email Campaign - Apr 1, 2024, 8:08 PM", "$_cohort$message_send_cohort": "1711992788:01HTDA5JFKGMQ7E0R9X2CTCH7N", "$ESP": 2, "$group_ids": ["RnsiHB", "TaSce6"], "Inbox Provider": "Gsuite", "$internal": {"IpPool": "high_2"}, "$event_id": "01HTDA5JFKGMQ7E0R9X2CTCH7N:286891138066467715282929475014182851178"}, "datetime": "2024-04-01 17:33:15+00:00", "uuid": "eb8b7f80-f04d-11ee-8001-14fbe4f80cec"}, "relationships": {"profile": {"data": {"type": "profile", "id": "01F23YG492HT91MKG2R285HN5W"}, "links": {"self": "https://a.klaviyo.com/api/events/57tBiWaqFjP/relationships/profile/", "related": "https://a.klaviyo.com/api/events/57tBiWaqFjP/profile/"}}, "metric": {"data": {"type": "metric", "id": "WKHXf4", "name": "Received Email"}, "links": {"self": "https://a.klaviyo.com/api/events/57tBiWaqFjP/relationships/metric/", "related": "https://a.klaviyo.com/api/events/57tBiWaqFjP/metric/"}}}, "links": {"self": "https://a.klaviyo.com/api/events/57tBiWaqFjP/"}, "datetime": "2024-04-01 17:33:15+00:00"}, "emitted_at": 1718031788620}
{"stream": "events_detailed", "data": {"type": "event", "id": "57tAZan7RB2", "attributes": {"timestamp": 1711992796, "event_properties": {"Subject": "Campaign for segment", "Email Domain": "airbyte.io", "$message": "01HTDA5JFKGMQ7E0R9X2CTCH7N", "Campaign Name": "Email Campaign - Apr 1, 2024, 8:08 PM", "$_cohort$message_send_cohort": "1711992788:01HTDA5JFKGMQ7E0R9X2CTCH7N", "$ESP": 2, "$group_ids": ["TaSce6"], "Inbox Provider": "Gsuite", "$internal": {"IpPool": "high_2"}, "$event_id": "01HTDA5JFKGMQ7E0R9X2CTCH7N:286891138224924040311458150201270751850"}, "datetime": "2024-04-01 17:33:16+00:00", "uuid": "ec241600-f04d-11ee-8001-bc8989f079a1"}, "relationships": {"profile": {"data": {"type": "profile", "id": "01F5VTX8KP49GGQ4BG77HZ9FRH"}, "links": {"self": "https://a.klaviyo.com/api/events/57tAZan7RB2/relationships/profile/", "related": "https://a.klaviyo.com/api/events/57tAZan7RB2/profile/"}}, "metric": {"data": {"type": "metric", "id": "WKHXf4", "name": "Received Email"}, "links": {"self": "https://a.klaviyo.com/api/events/57tAZan7RB2/relationships/metric/", "related": "https://a.klaviyo.com/api/events/57tAZan7RB2/metric/"}}}, "links": {"self": "https://a.klaviyo.com/api/events/57tAZan7RB2/"}, "datetime": "2024-04-01 17:33:16+00:00"}, "emitted_at": 1718031788885}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.2.2@sha256:57703de3b4c4204bd68a7b13c9300f8e03c0189bffddaffc796f1da25d2dbea0
dockerImageTag: 2.6.4
dockerImageTag: 2.7.0
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.6.4"
version = "2.7.0"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Iterable, Mapping

import dpath
import requests
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor


@dataclass
class KlaviyoIncludedFieldExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
# Evaluate and retrieve the extraction paths
evaluated_field_paths = [field_path.eval(self.config) for field_path in self._field_path]
target_records = self.extract_records_by_path(response, evaluated_field_paths)
included_records = self.extract_records_by_path(response, ["included"])

# Update target records with included records
updated_records = self.update_target_records_with_included(target_records, included_records)
yield from updated_records

@staticmethod
def update_target_records_with_included(
target_records: Iterable[Mapping[str, Any]], included_records: Iterable[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
for included_record in included_records:
included_attributes = included_record.get("attributes", {})
for target_record in target_records:
target_relationships = target_record.get("relationships", {})
included_record_type = included_record["type"]
if included_record_type in target_relationships:
target_relationships[included_record_type]["data"].update(included_attributes)
yield target_record

def extract_records_by_path(self, response: requests.Response, field_paths: list = None) -> Iterable[Mapping[str, Any]]:
response_body = self.decoder.decode(response)

# Extract data from the response body based on the provided field paths
if not field_paths:
extracted_data = response_body
else:
field_path_str = "/".join(field_paths) # Convert list of field paths to a single string path for dpath
if "*" in field_path_str:
extracted_data = dpath.util.values(response_body, field_path_str)
else:
extracted_data = dpath.util.get(response_body, field_path_str, default=[])

# Yield extracted data as individual records
if isinstance(extracted_data, list):
yield from extracted_data
elif extracted_data:
yield extracted_data
else:
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,51 @@ definitions:
$parameters:
path: "lists/{{ stream_slice.id }}"

events_detailed_stream:
# Docs: https://developers.klaviyo.com/en/reference/get_event
name: "events_detailed"
$ref: "#/definitions/base_semi_incremental_stream"
schema_loader:
type: InlineSchemaLoader
schema: "#/definitions/events_detailed_schema"
retriever:
$ref: "#/definitions/base_retriever"
record_selector:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_klaviyo.components.inclouded_fields_extractor.KlaviyoIncludedFieldExtractor
field_path: ["data"]
requester:
$ref: "#/definitions/requester"
request_parameters:
"include": "metric"
"fields[metric]": "name"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: "id"
stream: "#/definitions/events_stream"
partition_field: "event_id"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: "datetime"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%dT%H:%M:%S%z"
- "%Y-%m-%d %H:%M:%S%z"
start_datetime: "{{ config.get('start_date', '2012-01-01T00:00:00Z') }}"
transformations:
- type: AddFields
fields:
- type: AddedFieldDefinition
path: ["datetime"]
value: "{{ record.get('attributes', {}).get('datetime') }}"
$parameters:
path: "events/{{ stream_slice.event_id }}"

# Schemas
shared:
list_properties:
Expand Down Expand Up @@ -691,6 +736,78 @@ definitions:
related:
type: string

events_detailed_schema:
$schema: "http://json-schema.org/draft-07/schema#"
type: object
additionalProperties: true
properties:
type:
type: string
id:
type: string
datetime:
type: string
format: date-time
attributes:
type: ["null", object]
properties:
timestamp:
type: integer
event_properties:
type: ["null", object]
additionalProperties: true
datetime:
type: string
format: date-time
uuid:
type: string
links:
type: ["null", object]
properties:
self:
type: string
relationships:
type: ["null", object]
properties:
profile:
type: ["null", object]
properties:
data:
type: ["null", object]
properties:
type:
type: string
id:
type: string
links:
type: ["null", object]
additionalProperties: true
properties:
self:
type: string
related:
type: string
metric:
type: ["null", object]
properties:
data:
type: ["null", object]
properties:
type:
type: string
id:
type: string
name:
type: string
links:
type: ["null", object]
additionalProperties: true
properties:
self:
type: string
related:
type: string

email_templates_schema:
$schema: "http://json-schema.org/draft-07/schema#"
type: object
Expand Down Expand Up @@ -796,6 +913,7 @@ streams:
- "#/definitions/profiles_stream"
- "#/definitions/global_exclusions_stream"
- "#/definitions/events_stream"
- "#/definitions/events_detailed_stream"
- "#/definitions/email_templates_stream"

# Semi-Incremental streams
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from unittest.mock import Mock, patch

import pytest
from requests.models import Response
from source_klaviyo.components.inclouded_fields_extractor import KlaviyoIncludedFieldExtractor


@pytest.fixture
def mock_response():
return Mock(spec=Response)


@pytest.fixture
def mock_decoder():
return Mock()


@pytest.fixture
def mock_config():
return Mock()


@pytest.fixture
def mock_field_path():
return [Mock() for _ in range(2)]


@pytest.fixture
def extractor(mock_config, mock_field_path, mock_decoder):
return KlaviyoIncludedFieldExtractor(mock_field_path, mock_config, mock_decoder)


@patch('dpath.util.get')
@patch('dpath.util.values')
def test_extract_records_by_path(mock_values, mock_get, extractor, mock_response, mock_decoder):
mock_values.return_value = [{'key': 'value'}]
mock_get.return_value = {'key': 'value'}
mock_decoder.decode.return_value = {'data': 'value'}

field_paths = ['data']
records = list(extractor.extract_records_by_path(mock_response, field_paths))
assert records == [{'key': 'value'}]

mock_values.return_value = []
mock_get.return_value = None
records = list(extractor.extract_records_by_path(mock_response, ['included']))
assert records == []


def test_update_target_records_with_included(extractor):
target_records = [{'relationships': {'type1': {'data': {}}}}]
included_records = [{'type': 'type1', 'attributes': {'key': 'value'}}]

updated_records = list(extractor.update_target_records_with_included(target_records, included_records))
assert updated_records[0]['relationships']['type1']['data'] == {'key': 'value'}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_streams():
source = SourceKlaviyo()
config = {"api_key": "some_key", "start_date": pendulum.datetime(2020, 10, 10).isoformat()}
streams = source.streams(config)
expected_streams_number = 10
expected_streams_number = 11
assert len(streams) == expected_streams_number

# ensure only unique stream names are returned
Expand Down
16 changes: 10 additions & 6 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ The Klaviyo source connector supports the following [sync modes](https://docs.ai
- [Campaigns Detailed](https://developers.klaviyo.com/en/v2023-06-15/reference/get_campaigns)
- [Email Templates](https://developers.klaviyo.com/en/reference/get_templates)
- [Events](https://developers.klaviyo.com/en/reference/get_events)
- [Events Detailed](https://developers.klaviyo.com/en/reference/get_event)
- [Flows](https://developers.klaviyo.com/en/reference/get_flows)
- [GlobalExclusions](https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles)
- [Lists](https://developers.klaviyo.com/en/reference/get_lists)
Expand All @@ -56,14 +57,16 @@ Stream `Campaigns Detailed` contains fields `estimated_recipient_count` and `cam

Stream `Lists Detailed` contains field `profile_count` in addition to info from the `Lists` stream. Additional time is needed to fetch extra data due to Klaviyo API [limitation](https://developers.klaviyo.com/en/reference/get_list).

Stream `Events Detailed` contains field `name` for `metric` relationship - addition to [info](https://developers.klaviyo.com/en/reference/get_event).

## Data type map

| Integration Type | Airbyte Type | Notes |
| :--------------- | :----------- | :---- |
| `string` | `string` | |
| `number` | `number` | |
| `array` | `array` | |
| `object` | `object` | |
| Integration Type | Airbyte Type |
|:-----------------|:-------------|
| `string` | `string` |
| `number` | `number` |
| `array` | `array` |
| `object` | `object` |

## Changelog

Expand All @@ -72,6 +75,7 @@ Stream `Lists Detailed` contains field `profile_count` in addition to info from

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|
| `2.7.0` | 2024-06-08 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Add `events_detailed` stream |
| `2.6.4` | 2024-06-06 | [38879](https://github.com/airbytehq/airbyte/pull/38879) | Implement `CheckpointMixin` for handling state in Python streams |
| `2.6.3` | 2024-06-04 | [38935](https://github.com/airbytehq/airbyte/pull/38935) | [autopull] Upgrade base image to v1.2.1 |
| `2.6.2` | 2024-05-08 | [37789](https://github.com/airbytehq/airbyte/pull/37789) | Move stream schemas and spec to manifest |
Expand Down

0 comments on commit cc74a08

Please sign in to comment.