From c55f185109d2c275271e6c13655ca37fa08a5aed Mon Sep 17 00:00:00 2001 From: Ahmed Buksh <38909200+ahmed-buksh@users.noreply.github.com> Date: Tue, 31 May 2022 11:42:46 +0500 Subject: [PATCH] :rocket: Source Klaviyo: New Stream addition along with update to existing (#11685) * :rocket: flow stream added along with flow, campaign and flow message addition to event * :zap: liniting fix * :hammer: annotations updated along with update method for klaviyo * :boom: docker version updated and log added * :hammer: fixed acceptance test for klaviyo flow stream * :hammer: unused import removed * fix: flows stream has no records thus tests are failing * chore: update seed file Co-authored-by: Harshith Mullapudi --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-klaviyo/Dockerfile | 2 +- .../source-klaviyo/acceptance-test-config.yml | 1 + .../integration_tests/abnormal_state.json | 3 ++ .../integration_tests/configured_catalog.json | 15 ++++++++ .../source-klaviyo/source_klaviyo/schemas.py | 13 +++++++ .../source-klaviyo/source_klaviyo/source.py | 3 +- .../source-klaviyo/source_klaviyo/streams.py | 34 +++++++++++++++++-- docs/integrations/sources/klaviyo.md | 1 + 10 files changed, 70 insertions(+), 6 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 1af6cbe575b4b4..3e02e635e9ef7e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -454,7 +454,7 @@ - name: Klaviyo sourceDefinitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde dockerRepository: airbyte/source-klaviyo - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/sources/klaviyo icon: klaviyo.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 68cd9a6b800509..0ee0b7d60cd9a1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4282,7 +4282,7 @@ supported_destination_sync_modes: [] supported_source_sync_modes: - "append" -- dockerImage: "airbyte/source-klaviyo:0.1.3" +- dockerImage: "airbyte/source-klaviyo:0.1.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/klaviyo" changelogUrl: "https://docs.airbyte.io/integrations/sources/klaviyo" diff --git a/airbyte-integrations/connectors/source-klaviyo/Dockerfile b/airbyte-integrations/connectors/source-klaviyo/Dockerfile index 316f3e986371bd..82a5c6bc8d422f 100644 --- a/airbyte-integrations/connectors/source-klaviyo/Dockerfile +++ b/airbyte-integrations/connectors/source-klaviyo/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-klaviyo diff --git a/airbyte-integrations/connectors/source-klaviyo/acceptance-test-config.yml b/airbyte-integrations/connectors/source-klaviyo/acceptance-test-config.yml index a2bd973a4e9274..8294d0bf070c43 100644 --- a/airbyte-integrations/connectors/source-klaviyo/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-klaviyo/acceptance-test-config.yml @@ -13,6 +13,7 @@ tests: - config_path: "secrets/config.json" basic_read: - config_path: "secrets/config.json" + empty_streams: ['flows'] incremental: - config_path: "secrets/config.json" future_state_path: "integration_tests/abnormal_state.json" diff --git a/airbyte-integrations/connectors/source-klaviyo/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-klaviyo/integration_tests/abnormal_state.json index 8a88bd61549a91..c9ac3cf70e4115 100644 --- a/airbyte-integrations/connectors/source-klaviyo/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-klaviyo/integration_tests/abnormal_state.json @@ -4,5 +4,8 @@ }, "global_exclusions": { "timestamp": "2120-10-10T00:00:00Z" + }, + "flows": { + "created": "2120-10-10 00:00:00" } } diff --git a/airbyte-integrations/connectors/source-klaviyo/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-klaviyo/integration_tests/configured_catalog.json index ac14b2cb00cc51..3e5f3a2f9a0a0d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-klaviyo/integration_tests/configured_catalog.json @@ -59,6 +59,21 @@ "cursor_field": null, "destination_sync_mode": "append", "primary_key": null + }, + { + "stream": { + "name": "flows", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null } ] } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas.py index 3a610b9fe6c4f5..e36c6a2c456d36 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas.py @@ -104,6 +104,19 @@ class Event(BaseSchemaModel): statistic_id: str event_properties: dict person: dict + flow_id: Optional[str] + campaign_id: Optional[str] + flow_message_id: Optional[str] + + +class Flow(BaseSchemaModel): + id: str + name: str + status: str + created: datetime + updated: datetime + customer_filter: Optional[dict] + trigger: dict class GlobalExclusion(BaseSchemaModel): diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 4c9d2955b620e6..3edd8f9a3a904d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -10,7 +10,7 @@ from airbyte_cdk.sources.streams import Stream from pydantic import Field from pydantic.main import BaseModel -from source_klaviyo.streams import Campaigns, Events, GlobalExclusions, Lists, Metrics +from source_klaviyo.streams import Campaigns, Events, Flows, GlobalExclusions, Lists, Metrics class ConnectorConfig(BaseModel): @@ -61,6 +61,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: GlobalExclusions(api_key=config.api_key, start_date=config.start_date), Lists(api_key=config.api_key), Metrics(api_key=config.api_key), + Flows(api_key=config.api_key, start_date=config.start_date), ] def spec(self, *args, **kwargs) -> ConnectorSpecification: diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 4b6c873ac77185..67e2e14582eb41 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -2,13 +2,14 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import datetime from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum import requests from airbyte_cdk.sources.streams.http import HttpStream -from source_klaviyo.schemas import Campaign, Event, GlobalExclusion, Metric, PersonList +from source_klaviyo.schemas import Campaign, Event, Flow, GlobalExclusion, Metric, PersonList class KlaviyoStream(HttpStream, ABC): @@ -99,7 +100,13 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. """ state_ts = int(current_stream_state.get(self.cursor_field, 0)) - return {self.cursor_field: max(latest_record.get(self.cursor_field), state_ts)} + latest_record = latest_record.get(self.cursor_field) + + if isinstance(latest_record, str): + latest_record = datetime.datetime.strptime(latest_record, "%Y-%m-%d %H:%M:%S") + latest_record = datetime.datetime.timestamp(latest_record) + + return {self.cursor_field: max(latest_record, state_ts)} def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ @@ -240,3 +247,26 @@ class Events(IncrementalKlaviyoStream): def path(self, **kwargs) -> str: return "metrics/timeline" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + :return an iterable containing each record in the response + """ + response_json = response.json() + for record in response_json.get("data", []): + flow = record["event_properties"].get("$flow") + flow_message_id = record["event_properties"].get("$message") + + record["flow_id"] = flow + record["flow_message_id"] = flow_message_id + record["campaign_id"] = flow_message_id if not flow else None + + yield record + + +class Flows(ReverseIncrementalKlaviyoStream): + schema = Flow + cursor_field = "created" + + def path(self, **kwargs) -> str: + return "flows" diff --git a/docs/integrations/sources/klaviyo.md b/docs/integrations/sources/klaviyo.md index 0c19b99bce6116..c63b02da3b25d6 100644 --- a/docs/integrations/sources/klaviyo.md +++ b/docs/integrations/sources/klaviyo.md @@ -52,5 +52,6 @@ Please follow these [steps](https://help.klaviyo.com/hc/en-us/articles/115005062 | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. | | `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. | | `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |