diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index a27dc6b71a699..e0f7782d743a1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1895,10 +1895,10 @@ - name: Zenloop sourceDefinitionId: f1e4c7f6-db5c-4035-981f-d35ab4998794 dockerRepository: airbyte/source-zenloop - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.com/integrations/sources/zenloop sourceType: api - releaseStage: alpha + releaseStage: beta - sourceDefinitionId: cdaf146a-9b75-49fd-9dd2-9d64a0bb4781 name: Sentry dockerRepository: airbyte/source-sentry diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index d965aa05df151..1ea4d560e5a7b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -16433,7 +16433,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-zenloop:0.1.3" +- dockerImage: "airbyte/source-zenloop:0.1.4" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/zenloop" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-zenloop/Dockerfile b/airbyte-integrations/connectors/source-zenloop/Dockerfile index 9e50a910966cb..6d0f8bccdcaf5 100644 --- a/airbyte-integrations/connectors/source-zenloop/Dockerfile +++ b/airbyte-integrations/connectors/source-zenloop/Dockerfile @@ -34,5 +34,5 @@ COPY source_zenloop ./source_zenloop ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-zenloop diff --git a/airbyte-integrations/connectors/source-zenloop/README.md b/airbyte-integrations/connectors/source-zenloop/README.md index c93616b0c476f..c9b0f0457444c 100644 --- a/airbyte-integrations/connectors/source-zenloop/README.md +++ b/airbyte-integrations/connectors/source-zenloop/README.md @@ -82,7 +82,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. First install test dependencies into your virtual environment: ``` -pip install .[tests] +pip install .'[tests]' ``` ### Unit Tests To run unit tests locally, from the connector directory run: diff --git a/airbyte-integrations/connectors/source-zenloop/acceptance-test-config.yml b/airbyte-integrations/connectors/source-zenloop/acceptance-test-config.yml index c179240af6912..0b198ce0e4ae7 100644 --- a/airbyte-integrations/connectors/source-zenloop/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-zenloop/acceptance-test-config.yml @@ -13,31 +13,16 @@ tests: - config_path: "secrets/config.json" basic_read: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_answers.json" - empty_streams: [] - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_surveys.json" - empty_streams: [] - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_answers_survey_group.json" - empty_streams: [] - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_survey_groups.json" - empty_streams: [] - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_properties.json" - empty_streams: [] + configured_catalog_path: "integration_tests/configured_catalog.json" + expect_records: + path: "integration_tests/expected_records.txt" + extra_fields: no + exact_order: no + extra_records: yes incremental: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_answers.json" - future_state_path: "integration_tests/abnormal_state.json" - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_answers_survey_group.json" + configured_catalog_path: "integration_tests/configured_catalog.json" future_state_path: "integration_tests/abnormal_state.json" full_refresh: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_survey_groups.json" - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_surveys.json" - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_properties.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-zenloop/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-zenloop/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..da5f22c5b76b7 --- /dev/null +++ b/airbyte-integrations/connectors/source-zenloop/integration_tests/configured_catalog.json @@ -0,0 +1,49 @@ +{ + "streams": [ + { + "stream": { + "name": "answers", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "surveys", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "survey_groups", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "answers_survey_group", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "properties", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-zenloop/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-zenloop/integration_tests/expected_records.txt new file mode 100644 index 0000000000000..c2ecfd27e042c --- /dev/null +++ b/airbyte-integrations/connectors/source-zenloop/integration_tests/expected_records.txt @@ -0,0 +1,10 @@ +{"stream": "answers", "data": {"additional_answers": [], "additional_questions": {}, "comment": "Airbyte is amazing", "email": "sajarin@airbyte.io", "id": "TVRrMlptWmtPRGd0TWprMFl5MDBZbUkwTFRrd01XWXROVE5pWldNMU56WXpNbVV4", "identity": "sajarin@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:22.373163Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Sajarin Dider", "property_ids": [348043522, 348043523], "recipient_id": "TVRFMU1qTXhaV1F0Wm1Oa01TMDBOMkUzTFRsbE1ETXRObVEzTjJNd1lqWmpZMkpp", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "translated_comment": null}, "emitted_at": 1670592045869} +{"stream": "answers", "data": {"additional_answers": [], "additional_questions": {}, "comment": "I love airbyte", "email": "integration-test@airbyte.io", "id": "TXpCaU5tRmtNekl0T1dKa09TMDBPRGd5TFdKbFlURXROalF6WkRkbFlqVmhaR0kw", "identity": "integration-test@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:14.348616Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Test Account", "property_ids": [348044048, 348043523], "recipient_id": "TkdOaU5HRTVOMlV0WWpReE1TMDBNRGxrTFdJeU9UUXROVGcxTVRCbE5UVXhaakpo", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "translated_comment": null}, "emitted_at": 1670592045874} +{"stream": "surveys", "data": {"inserted_at": "2022-09-19T08:36:26Z", "public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "status": "active", "title": "New Survey 2022-09-19 08:36:26.262267"}, "emitted_at": 1670592046346} +{"stream": "survey_groups", "data": {"inserted_at": "2021-11-09T13:06:59Z", "name": "All Surveys & Survey Groups", "public_hash_id": "WmpGa1ltTmlZbVl0TWpGa015MDBOemhsTFdKbE1XSXRaV05sTXpnMk9USmlOalZp", "surveys": [{"inserted_at": "2022-09-19T08:36:26Z", "public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "status": "active", "title": "New Survey 2022-09-19 08:36:26.262267"}]}, "emitted_at": 1670592046589} +{"stream": "survey_groups", "data": {"inserted_at": "2022-09-19T08:48:21Z", "name": "Test Group", "public_hash_id": "TnpKaE1UVmhObUV0WkdFME15MDBZMkUyTFRsalpXRXROamt5TkRVd05EZzVOelEy", "surveys": [{"inserted_at": "2022-09-19T08:36:26Z", "public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "status": "active", "title": "New Survey 2022-09-19 08:36:26.262267"}]}, "emitted_at": 1670592046592} +{"stream": "answers_survey_group", "data": {"additional_questions": {}, "comment": "Airbyte is amazing", "email": "sajarin@airbyte.io", "id": "TVRrMlptWmtPRGd0TWprMFl5MDBZbUkwTFRrd01XWXROVE5pWldNMU56WXpNbVV4", "identity": "sajarin@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:22.373163Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Sajarin Dider", "property_ids": [348043522, 348043523], "recipient_id": "TVRFMU1qTXhaV1F0Wm1Oa01TMDBOMkUzTFRsbE1ETXRObVEzTjJNd1lqWmpZMkpp", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "survey_public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "translated_comment": null}, "emitted_at": 1670592046844} +{"stream": "answers_survey_group", "data": {"additional_questions": {}, "comment": "I love airbyte", "email": "integration-test@airbyte.io", "id": "TXpCaU5tRmtNekl0T1dKa09TMDBPRGd5TFdKbFlURXROalF6WkRkbFlqVmhaR0kw", "identity": "integration-test@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:14.348616Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Test Account", "property_ids": [348044048, 348043523], "recipient_id": "TkdOaU5HRTVOMlV0WWpReE1TMDBNRGxrTFdJeU9UUXROVGcxTVRCbE5UVXhaakpo", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "survey_public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "translated_comment": null}, "emitted_at": 1670592046851} +{"stream": "properties", "data": {"id": "WldSa1pUTmtaVGN0TWprNU5pMDBPVEUyTFdGbE9XSXRPVEkwTVRjM1lqUXlNMlU0", "name": "gender", "value": "agender"}, "emitted_at": 1670592047292} +{"stream": "properties", "data": {"id": "WWpsa1pUUmlaVFl0Tmpoa055MDBZV0l3TFRsbE9USXRaV1pqWVdFMFpESTFNR1E0", "name": "gender", "value": "male"}, "emitted_at": 1670592047295} +{"stream": "properties", "data": {"id": "TldKak1qQTNaRFF0WWpVeU5DMDBaVEpqTFdJNFkyVXRPVFJqTldFNU9Ea3haRFps", "name": "country", "value": "US"}, "emitted_at": 1670592047297} diff --git a/airbyte-integrations/connectors/source-zenloop/source_zenloop/components.py b/airbyte-integrations/connectors/source-zenloop/source_zenloop/components.py new file mode 100644 index 0000000000000..3504431458264 --- /dev/null +++ b/airbyte-integrations/connectors/source-zenloop/source_zenloop/components.py @@ -0,0 +1,34 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from dataclasses import dataclass +from typing import Iterable + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState + + +@dataclass +class ZenloopSubstreamSlicer(SubstreamSlicer): + def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Iterable[StreamSlice]: + """ + + config_parent_field : parent field name in config + + Use parent id's as stream state value if it specified in config or + create stream_slices according SubstreamSlicer logic. + + """ + config = self._options.get("config") + parent_field = self._options.get("config_parent_field") + custom_stream_state_value = config.get(parent_field) + + if not custom_stream_state_value: + yield from super().stream_slices(sync_mode, stream_state) + else: + for parent_stream_config in self.parent_stream_configs: + stream_state_field = parent_stream_config.stream_slice_field or None + yield {stream_state_field: custom_stream_state_value, "parent_slice": {}} diff --git a/airbyte-integrations/connectors/source-zenloop/source_zenloop/source.py b/airbyte-integrations/connectors/source-zenloop/source_zenloop/source.py index 6631a614895d4..d3eeb2cf46c14 100644 --- a/airbyte-integrations/connectors/source-zenloop/source_zenloop/source.py +++ b/airbyte-integrations/connectors/source-zenloop/source_zenloop/source.py @@ -2,253 +2,16 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -import math -from abc import ABC -from datetime import datetime, timedelta -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +""" +This file provides the necessary constructs to interpret a provided declarative YAML configuration file into +source connector. +WARNING: Do not modify this file. +""" -import requests -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator - -class ZenloopStream(HttpStream, ABC): - - url_base = "https://api.zenloop.com/v1/" - extra_params = None - has_date_param = False - - def __init__(self, api_token: str, date_from: Optional[str], survey_id, survey_group_id: Optional[str], **kwargs): - super().__init__(authenticator=api_token) - self.api_token = api_token - self.date_from = date_from or datetime.today().strftime("%Y-%m-%d") - self.survey_id = survey_id or None - self.survey_group_id = survey_group_id or None - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - decoded_response = response.json() - page = decoded_response["meta"]["page"] - per_page = decoded_response["meta"]["per_page"] - total = decoded_response["meta"]["total"] - - if page < math.ceil(total / per_page): - return {"page": page + 1} - else: - return None - - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - if self.has_date_param: - params = {"date_from": self.date_from} - else: - params = {} - if self.extra_params: - params.update(self.extra_params) - if next_page_token: - params.update(**next_page_token) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - yield response_json - - -class ChildStreamMixin: - - parent_stream_class: Optional[ZenloopStream] = None - - def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: - # determine if parent_stream_class is Surveys or SurveyGroups - if self.parent_stream_class.__name__ == "Surveys": - public_hash_id = self.survey_id - else: - public_hash_id = self.survey_group_id - # loop through all survey_id's if None was provided - # return nothing otherwise - if not public_hash_id: - for item in self.parent_stream_class( - api_token=self.api_token, date_from=self.date_from, survey_id=self.survey_id, survey_group_id=self.survey_group_id - ).read_records(sync_mode=sync_mode): - # set date_from to most current cursor_field or date_from if not incremental - if stream_state: - date_from = stream_state[self.cursor_field] - else: - date_from = self.date_from - yield {"survey_slice": item["public_hash_id"], "date_from": date_from} - else: - yield None - - -class IncrementalZenloopStream(ZenloopStream, ABC): - # checkpoint stream reads after 1000 records. - state_checkpoint_interval = 1000 - cursor_field = "inserted_at" - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - # latest_record has objects in answers - if latest_record: - # add 1 second to not pull latest_record again - latest_record_date = ( - datetime.strptime(latest_record[self.cursor_field], "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(seconds=1) - ).isoformat() + str("Z") - else: - latest_record_date = "" - max_record = max(latest_record_date, current_stream_state.get(self.cursor_field, "")) - return {self.cursor_field: max_record} - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - params = super().request_params(stream_state, stream_slice, next_page_token) - if stream_state: - # if looped through all slices take its date_from parameter - # else no survey_id or survey_group_id provided -> take cursor_field - if stream_slice: - params["date_from"] = stream_slice["date_from"] - else: - params["date_from"] = stream_state[self.cursor_field] - return params - - -class Surveys(ZenloopStream): - # API Doc: https://docs.zenloop.com/reference#get-list-of-surveys - primary_key = None - has_date_param = False - extra_params = {"page": "1"} - use_cache = True - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - return "surveys" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - yield from response_json.get("surveys", []) - - -class Answers(ChildStreamMixin, IncrementalZenloopStream): - # API Doc: https://docs.zenloop.com/reference#get-answers - primary_key = "id" - has_date_param = True - parent_stream_class = Surveys - extra_params = { - "page": "1", - "order_type": "desc", - "order_by": "inserted_at", - "date_shortcut": "custom", - "date_to": datetime.today().strftime("%Y-%m-%d"), - } - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - # take optional survey_id if entered - if self.survey_id: - return f"surveys/{self.survey_id}/answers" - # slice all survey_id's if nothing provided - else: - return f"surveys/{stream_slice['survey_slice']}/answers" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - # select answers and surveys to be able to link answer to a survey - yield from response_json.get("answers", []) - - -class Properties(ChildStreamMixin, ZenloopStream): - # API Doc: https://docs.zenloop.com/reference/get-list-of-properties - primary_key = "id" - has_date_param = False - extra_params = {"page": "1"} - parent_stream_class = Surveys - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - # take optional survey_id if entered - if self.survey_id: - return f"surveys/{self.survey_id}/properties" - # slice all survey_id's if nothing provided - else: - return f"surveys/{stream_slice['survey_slice']}/properties" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - # select properties and surveys to be able to link properties to a survey - yield from response_json.get("properties", []) - - -class SurveyGroups(ZenloopStream): - # API Doc: https://docs.zenloop.com/reference#get-list-of-survey-groups - primary_key = None - has_date_param = False - extra_params = {"page": "1"} - use_cache = True - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - return "survey_groups" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - yield from response_json.get("survey_groups", []) - - -class AnswersSurveyGroup(ChildStreamMixin, IncrementalZenloopStream): - # API Doc: https://docs.zenloop.com/reference#get-answers-for-survey-group - primary_key = "id" - has_date_param = True - parent_stream_class = SurveyGroups - extra_params = { - "page": "1", - "order_type": "desc", - "order_by": "inserted_at", - "date_shortcut": "custom", - "date_to": datetime.today().strftime("%Y-%m-%d"), - } - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - # take optional survey_group_id if entered - if self.survey_group_id: - return f"survey_groups/{self.survey_group_id}/answers" - # slice all survey_group_id's if nothing provided - else: - return f"survey_groups/{stream_slice['survey_slice']}/answers" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - # select answers and surveys to be able to link answer to a survey - yield from response_json.get("answers", []) - - -class SourceZenloop(AbstractSource): - def check_connection(self, logger, config) -> Tuple[bool, any]: - try: - authenticator = TokenAuthenticator(config["api_token"]) - url = f"{ZenloopStream.url_base}surveys" - - session = requests.get(url, headers=authenticator.get_auth_header()) - session.raise_for_status() - return True, None - except Exception as error: - return False, f"Unable to connect to Zenloop API with the provided credentials - {error}" - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - args = { - "api_token": TokenAuthenticator(token=config["api_token"]), - "date_from": config["date_from"], - "survey_id": config.get("survey_id"), - "survey_group_id": config.get("survey_group_id"), - } - return [Surveys(**args), Answers(**args), Properties(**args), SurveyGroups(**args), AnswersSurveyGroup(**args)] +# Declarative Source +class SourceZenloop(YamlDeclarativeSource): + def __init__(self): + super().__init__(path_to_yaml="zenloop.yaml") diff --git a/airbyte-integrations/connectors/source-zenloop/source_zenloop/streams.py b/airbyte-integrations/connectors/source-zenloop/source_zenloop/streams.py new file mode 100644 index 0000000000000..e6d469052d7af --- /dev/null +++ b/airbyte-integrations/connectors/source-zenloop/source_zenloop/streams.py @@ -0,0 +1,229 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import math +from abc import ABC +from datetime import datetime, timedelta +from typing import Any, Iterable, Mapping, MutableMapping, Optional + +import requests +from airbyte_cdk.sources.streams.http import HttpStream + + +class ZenloopStream(HttpStream, ABC): + + url_base = "https://api.zenloop.com/v1/" + extra_params = None + has_date_param = False + + def __init__(self, api_token: str, date_from: Optional[str], survey_id, survey_group_id: Optional[str], **kwargs): + super().__init__(authenticator=api_token) + self.api_token = api_token + self.date_from = date_from or datetime.today().strftime("%Y-%m-%d") + self.survey_id = survey_id or None + self.survey_group_id = survey_group_id or None + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + decoded_response = response.json() + page = decoded_response["meta"]["page"] + per_page = decoded_response["meta"]["per_page"] + total = decoded_response["meta"]["total"] + + if page < math.ceil(total / per_page): + return {"page": page + 1} + else: + return None + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + if self.has_date_param: + params = {"date_from": self.date_from} + else: + params = {} + if self.extra_params: + params.update(self.extra_params) + if next_page_token: + params.update(**next_page_token) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + yield response_json + + +class ChildStreamMixin: + + parent_stream_class: Optional[ZenloopStream] = None + + def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + # determine if parent_stream_class is Surveys or SurveyGroups + if self.parent_stream_class.__name__ == "Surveys": + public_hash_id = self.survey_id + else: + public_hash_id = self.survey_group_id + # loop through all survey_id's if None was provided + # return nothing otherwise + if not public_hash_id: + for item in self.parent_stream_class( + api_token=self.api_token, date_from=self.date_from, survey_id=self.survey_id, survey_group_id=self.survey_group_id + ).read_records(sync_mode=sync_mode): + # set date_from to most current cursor_field or date_from if not incremental + if stream_state: + date_from = stream_state[self.cursor_field] + else: + date_from = self.date_from + yield {"survey_slice": item["public_hash_id"], "date_from": date_from} + else: + yield None + + +class IncrementalZenloopStream(ZenloopStream, ABC): + # checkpoint stream reads after 1000 records. + state_checkpoint_interval = 1000 + cursor_field = "inserted_at" + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + # latest_record has objects in answers + if latest_record: + # add 1 second to not pull latest_record again + latest_record_date = ( + datetime.strptime(latest_record[self.cursor_field], "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(seconds=1) + ).isoformat() + str("Z") + else: + latest_record_date = "" + max_record = max(latest_record_date, current_stream_state.get(self.cursor_field, "")) + return {self.cursor_field: max_record} + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + if stream_state: + # if looped through all slices take its date_from parameter + # else no survey_id or survey_group_id provided -> take cursor_field + if stream_slice: + params["date_from"] = stream_slice["date_from"] + else: + params["date_from"] = stream_state[self.cursor_field] + return params + + +class Surveys(ZenloopStream): + # API Doc: https://docs.zenloop.com/reference#get-list-of-surveys + primary_key = None + has_date_param = False + extra_params = {"page": "1"} + use_cache = True + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "surveys" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + yield from response_json.get("surveys", []) + + +class Answers(ChildStreamMixin, IncrementalZenloopStream): + # API Doc: https://docs.zenloop.com/reference#get-answers + primary_key = "id" + has_date_param = True + parent_stream_class = Surveys + extra_params = { + "page": "1", + "order_type": "desc", + "order_by": "inserted_at", + "date_shortcut": "custom", + "date_to": datetime.today().strftime("%Y-%m-%d"), + } + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + # take optional survey_id if entered + if self.survey_id: + return f"surveys/{self.survey_id}/answers" + # slice all survey_id's if nothing provided + else: + return f"surveys/{stream_slice['survey_slice']}/answers" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + # select answers and surveys to be able to link answer to a survey + yield from response_json.get("answers", []) + + +class Properties(ChildStreamMixin, ZenloopStream): + # API Doc: https://docs.zenloop.com/reference/get-list-of-properties + primary_key = "id" + has_date_param = False + extra_params = {"page": "1"} + parent_stream_class = Surveys + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + # take optional survey_id if entered + if self.survey_id: + return f"surveys/{self.survey_id}/properties" + # slice all survey_id's if nothing provided + else: + return f"surveys/{stream_slice['survey_slice']}/properties" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + # select properties and surveys to be able to link properties to a survey + yield from response_json.get("properties", []) + + +class SurveyGroups(ZenloopStream): + # API Doc: https://docs.zenloop.com/reference#get-list-of-survey-groups + primary_key = None + has_date_param = False + extra_params = {"page": "1"} + use_cache = True + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "survey_groups" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + yield from response_json.get("survey_groups", []) + + +class AnswersSurveyGroup(ChildStreamMixin, IncrementalZenloopStream): + # API Doc: https://docs.zenloop.com/reference#get-answers-for-survey-group + primary_key = "id" + has_date_param = True + parent_stream_class = SurveyGroups + extra_params = { + "page": "1", + "order_type": "desc", + "order_by": "inserted_at", + "date_shortcut": "custom", + "date_to": datetime.today().strftime("%Y-%m-%d"), + } + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + # take optional survey_group_id if entered + if self.survey_group_id: + return f"survey_groups/{self.survey_group_id}/answers" + # slice all survey_group_id's if nothing provided + else: + return f"survey_groups/{stream_slice['survey_slice']}/answers" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + # select answers and surveys to be able to link answer to a survey + yield from response_json.get("answers", []) diff --git a/airbyte-integrations/connectors/source-zenloop/source_zenloop/zenloop.yaml b/airbyte-integrations/connectors/source-zenloop/source_zenloop/zenloop.yaml new file mode 100644 index 0000000000000..f9605217cf20f --- /dev/null +++ b/airbyte-integrations/connectors/source-zenloop/source_zenloop/zenloop.yaml @@ -0,0 +1,149 @@ +version: "0.1.0" + +definitions: + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_pointer: ["{{ options['data_field'] }}"] + requester: + type: HttpRequester + name: "{{ options['name'] }}" + http_method: "GET" + authenticator: + type: BearerAuthenticator + api_token: "{{ config['api_token'] }}" + retriever: + type: SimpleRetriever + $options: + url_base: "https://api.zenloop.com/v1/" + name: "{{ options['name'] }}" + record_selector: + $ref: "*ref(definitions.selector)" + paginator: + type: DefaultPaginator + pagination_strategy: + type: PageIncrement + page_size: 50 + start_from_page: 1 + page_size_option: + field_name: "per_page" + inject_into: "request_parameter" + page_token_option: + inject_into: "path" + base_stream: + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + incremental_base_stream: + $ref: "*ref(definitions.base_stream)" + stream_cursor_field: "inserted_at" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + request_options_provider: + request_parameters: + order_type: "desc" + order_by: "inserted_at" + date_shortcut: "custom" + surveys: + $ref: "*ref(definitions.base_stream)" + $options: + name: "surveys" + path: "surveys" + data_field: "surveys" + surveys_slicer: + class_name: source_zenloop.components.ZenloopSubstreamSlicer + $options: + config_parent_field: "survey_id" + parent_stream_configs: + - stream: "*ref(definitions.surveys)" + parent_key: public_hash_id + stream_slice_field: id + survey_groups: + $ref: "*ref(definitions.base_stream)" + $options: + name: "survey_groups" + path: "survey_groups" + data_field: "survey_groups" + survey_groups_slicer: + class_name: source_zenloop.components.ZenloopSubstreamSlicer + $options: + config_parent_field: "survey_group_id" + parent_stream_configs: + - stream: "*ref(definitions.survey_groups)" + parent_key: public_hash_id + stream_slice_field: id + date_slicer: + type: DatetimeStreamSlicer + cursor_field: "inserted_at" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + start_datetime: + datetime: "{{ config['date_from'] }}" + datetime_format: "%Y-%m-%d" + end_datetime: + datetime: "{{ today_utc() }}" + datetime_format: "%Y-%m-%d" + step: "1m" + end_time_option: + field_name: "date_to" + inject_into: "request_parameter" + start_time_option: + field_name: "date_from" + inject_into: "request_parameter" + properties: + $ref: "*ref(definitions.base_stream)" + $options: + name: "properties" + data_field: "properties" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: "{{ 'surveys/' + config['survey_id'] + '/properties' if config['survey_id'] else 'surveys/' + stream_slice.id + '/properties' }}" + stream_slicer: + $ref: "*ref(definitions.surveys_slicer)" + answers: + $ref: "*ref(definitions.incremental_base_stream)" + $options: + name: "answers" + data_field: "answers" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.incremental_base_stream.retriever.requester)" + path: "{{ 'surveys/' + stream_slice.id + '/answers' }}" + stream_slicer: + type: CartesianProductStreamSlicer + stream_slicers: + - "*ref(definitions.surveys_slicer)" + - "*ref(definitions.date_slicer)" + answers_survey_group: + $ref: "*ref(definitions.incremental_base_stream)" + $options: + name: "answers_survey_group" + data_field: "answers" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.incremental_base_stream.retriever.requester)" + path: "{{ 'survey_groups/' + stream_slice.id + '/answers' }}" + stream_slicer: + type: CartesianProductStreamSlicer + stream_slicers: + - "*ref(definitions.survey_groups_slicer)" + - "*ref(definitions.date_slicer)" + + +streams: + - "*ref(definitions.surveys)" + - "*ref(definitions.survey_groups)" + - "*ref(definitions.properties)" + - "*ref(definitions.answers)" + - "*ref(definitions.answers_survey_group)" + +check: + type: CheckStream + stream_names: ["surveys"] \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-zenloop/unit_tests/__init__.py b/airbyte-integrations/connectors/source-zenloop/unit_tests/__init__.py deleted file mode 100644 index 46b7376756ec6..0000000000000 --- a/airbyte-integrations/connectors/source-zenloop/unit_tests/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-integrations/connectors/source-zenloop/unit_tests/conftest.py b/airbyte-integrations/connectors/source-zenloop/unit_tests/conftest.py deleted file mode 100644 index b697374e6477f..0000000000000 --- a/airbyte-integrations/connectors/source-zenloop/unit_tests/conftest.py +++ /dev/null @@ -1,10 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from pytest import fixture - - -@fixture -def config(): - return {"api_token": "", "date_from": "2021-07-01", "survey_id": "", "survey_group_id": ""} diff --git a/airbyte-integrations/connectors/source-zenloop/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-zenloop/unit_tests/test_incremental_streams.py deleted file mode 100644 index 6dc9696cc95a8..0000000000000 --- a/airbyte-integrations/connectors/source-zenloop/unit_tests/test_incremental_streams.py +++ /dev/null @@ -1,101 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from unittest.mock import MagicMock - -from airbyte_cdk.models import SyncMode -from pytest import fixture -from source_zenloop.source import Answers, AnswersSurveyGroup, IncrementalZenloopStream - - -@fixture -def patch_incremental_base_class(mocker): - # Mock abstract methods to enable instantiating abstract class - mocker.patch.object(IncrementalZenloopStream, "path", "v0/example_endpoint") - mocker.patch.object(IncrementalZenloopStream, "primary_key", "test_primary_key") - mocker.patch.object(IncrementalZenloopStream, "__abstractmethods__", set()) - - -def test_cursor_field(patch_incremental_base_class, config): - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"]) - expected_cursor_field = "inserted_at" - assert stream.cursor_field == expected_cursor_field - - -def test_get_updated_state(patch_incremental_base_class, config): - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"]) - expected_cursor_field = "inserted_at" - inputs = { - "current_stream_state": {expected_cursor_field: "2021-07-24T03:30:30.038549Z"}, - "latest_record": {"inserted_at": "2021-10-20T03:30:30.038549Z"}, - } - expected_state = {expected_cursor_field: "2021-10-20T03:30:31.038549Z"} - assert stream.get_updated_state(**inputs) == expected_state - - -def test_stream_slices(patch_incremental_base_class, config): - expected_cursor_field = "inserted_at" - inputs = { - "sync_mode": SyncMode.incremental, - "cursor_field": expected_cursor_field, - "stream_state": {expected_cursor_field: "2021-10-20T03:30:30Z"}, - } - expected_stream_slice = [None] - - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"]) - assert list(stream.stream_slices(**inputs)) == expected_stream_slice - - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], None) - assert list(stream.stream_slices(**inputs)) == expected_stream_slice - - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], None, config["survey_group_id"]) - assert list(stream.stream_slices(**inputs)) == expected_stream_slice - - -def test_supports_incremental(patch_incremental_base_class, mocker, config): - mocker.patch.object(IncrementalZenloopStream, "cursor_field", "dummy_field") - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"]) - assert stream.supports_incremental - - -def test_source_defined_cursor(patch_incremental_base_class, config): - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"]) - assert stream.source_defined_cursor - - -def test_stream_checkpoint_interval(patch_incremental_base_class, config): - stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"]) - expected_checkpoint_interval = 1000 - assert stream.state_checkpoint_interval == expected_checkpoint_interval - - -def test_parse_response_answers(patch_incremental_base_class, config): - stream = Answers(**config) - response = MagicMock() - response.json.return_value = {"answers": [{"id": 123, "name": "John Doe"}]} - inputs = {"response": response} - expected_parsed_object = {"id": 123, "name": "John Doe"} - assert next(stream.parse_response(**inputs)) == expected_parsed_object - - -def test_parse_response_answers_survey_groups(patch_incremental_base_class, config): - stream = AnswersSurveyGroup(**config) - response = MagicMock() - response.json.return_value = {"answers": [{"id": 123, "name": "John Doe"}]} - inputs = {"response": response} - expected_parsed_object = {"id": 123, "name": "John Doe"} - assert next(stream.parse_response(**inputs)) == expected_parsed_object - - -def test_surveys_path(config): - stream = Answers(**config) - expected = "surveys//answers" - assert stream.path() == expected - - -def test_survey_groups_path(config): - stream = AnswersSurveyGroup(**config) - expected = "survey_groups//answers" - assert stream.path() == expected diff --git a/airbyte-integrations/connectors/source-zenloop/unit_tests/test_source.py b/airbyte-integrations/connectors/source-zenloop/unit_tests/test_source.py deleted file mode 100644 index f258ba9bc7613..0000000000000 --- a/airbyte-integrations/connectors/source-zenloop/unit_tests/test_source.py +++ /dev/null @@ -1,38 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from unittest.mock import MagicMock - -import responses -from source_zenloop.source import SourceZenloop - - -@responses.activate -def test_check_connection_success(mocker, config): - responses.add( - responses.GET, - "https://api.zenloop.com/v1/surveys", - ) - source = SourceZenloop() - logger_mock = MagicMock() - assert source.check_connection(logger_mock, config) == (True, None) - - -@responses.activate -def test_check_connection_fail(mocker, config): - responses.add(responses.GET, "https://api.zenloop.com/v1/surveys", json={"error": "Unauthorized"}, status=401) - source = SourceZenloop() - logger_mock = MagicMock() - assert source.check_connection(logger_mock, config) == ( - False, - "Unable to connect to Zenloop API with the provided credentials - 401 Client Error: Unauthorized for url: https://api.zenloop.com/v1/surveys", - ) - - -def test_streams(mocker): - source = SourceZenloop() - config_mock = MagicMock() - streams = source.streams(config_mock) - expected_streams_number = 5 - assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-zenloop/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-zenloop/unit_tests/test_streams.py deleted file mode 100644 index 700c8c4df5283..0000000000000 --- a/airbyte-integrations/connectors/source-zenloop/unit_tests/test_streams.py +++ /dev/null @@ -1,113 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from http import HTTPStatus -from unittest.mock import MagicMock - -import pytest -from source_zenloop.source import Properties, SurveyGroups, Surveys, ZenloopStream - - -@pytest.fixture -def patch_base_class(mocker): - # Mock abstract methods to enable instantiating abstract class - mocker.patch.object(ZenloopStream, "path", "v0/example_endpoint") - mocker.patch.object(ZenloopStream, "primary_key", "test_primary_key") - mocker.patch.object(ZenloopStream, "__abstractmethods__", set()) - - -def test_request_params(patch_base_class, config): - stream = ZenloopStream(**config) - inputs = {"stream_slice": None, "stream_state": None, "next_page_token": {"page": "1"}} - expected_params = {"page": "1"} - assert stream.request_params(**inputs) == expected_params - - -def test_next_page_token(patch_base_class, config): - stream = ZenloopStream(**config) - inputs = {"response": MagicMock()} - inputs["response"].json.return_value = {"meta": {"page": 1, "per_page": 12, "total": 8}} - expected_token = None - assert stream.next_page_token(**inputs) == expected_token - - -def test_parse_response(patch_base_class, config): - stream = ZenloopStream(**config) - response = MagicMock() - response.json.return_value = {"answers": [{"id": 123, "name": "John Doe"}]} - inputs = {"response": response} - expected_parsed_object = {"answers": [{"id": 123, "name": "John Doe"}]} - assert next(stream.parse_response(**inputs)) == expected_parsed_object - - -def test_parse_response_surveys(patch_base_class, config): - stream = Surveys(**config) - response = MagicMock() - response.json.return_value = {"surveys": [{"id": 123, "name": "John Doe"}]} - inputs = {"response": response} - expected_parsed_object = {"id": 123, "name": "John Doe"} - assert next(stream.parse_response(**inputs)) == expected_parsed_object - - -def test_parse_response_survey_groups(patch_base_class, config): - stream = SurveyGroups(**config) - response = MagicMock() - response.json.return_value = {"survey_groups": [{"id": 123, "name": "John Doe"}]} - inputs = {"response": response} - expected_parsed_object = {"id": 123, "name": "John Doe"} - assert next(stream.parse_response(**inputs)) == expected_parsed_object - - -def test_surveys_path(config): - stream = Surveys(**config) - expected = "surveys" - assert stream.path() == expected - - -def test_survey_groups_path(config): - stream = SurveyGroups(**config) - expected = "survey_groups" - assert stream.path() == expected - - -def test_properties_path(config): - stream = Properties(**config) - expected = "surveys//properties" - assert stream.path() == expected - - -def test_request_headers(patch_base_class, config): - stream = ZenloopStream(**config) - inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} - expected_headers = {} - assert stream.request_headers(**inputs) == expected_headers - - -def test_http_method(patch_base_class, config): - stream = ZenloopStream(**config) - expected_method = "GET" - assert stream.http_method == expected_method - - -@pytest.mark.parametrize( - ("http_status", "should_retry"), - [ - (HTTPStatus.OK, False), - (HTTPStatus.BAD_REQUEST, False), - (HTTPStatus.TOO_MANY_REQUESTS, True), - (HTTPStatus.INTERNAL_SERVER_ERROR, True), - ], -) -def test_should_retry(patch_base_class, config, http_status, should_retry): - response_mock = MagicMock() - response_mock.status_code = http_status - stream = ZenloopStream(**config) - assert stream.should_retry(response_mock) == should_retry - - -def test_backoff_time(patch_base_class, config): - response_mock = MagicMock() - stream = ZenloopStream(**config) - expected_backoff_time = None - assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/docs/integrations/sources/zenloop.md b/docs/integrations/sources/zenloop.md index 89efbb21848af..948322e183fbd 100644 --- a/docs/integrations/sources/zenloop.md +++ b/docs/integrations/sources/zenloop.md @@ -1,10 +1,47 @@ # Zenloop -## Sync overview - -This source can sync data for the [Zenloop API](https://docs.zenloop.com/reference). It supports both Full Refresh and Incremental syncs for Answer endpoints. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. - -### Output schema +This page contains the setup guide and reference information for the Zenloop source connector. + +## Prerequisites + +**For Airbyte Cloud:** + +1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces). +2. Click **Sources** and then click **+ New source**. +3. On the Set up the source page, select **Zenloop** from the Source type dropdown. +4. Enter the name for the Zenloop connector. +5. Enter your **API token** +6. For **Date from**, enter the date in YYYY-MM-DDTHH:mm:ssZ format. The data added on and after this date will be replicated. +7. Enter your **Survey ID**. Zenloop Survey ID. Can be found here. Leave empty to pull answers from all surveys. (Optional) +8. Enter your **Survey Group ID**. Zenloop Survey Group ID. Can be found by pulling All Survey Groups via SurveyGroups stream. Leave empty to pull answers from all survey groups. (Optional) +9. Click **Set up source**. + + + +**For Airbyte Open Source:** + +1. Navigate to the Airbyte Open Source dashboard. +2. Click **Sources** and then click **+ New source**. +3. On the Set up the source page, select **Zenloop** from the Source type dropdown. +4. Enter the name for the Zenloop connector. +5. Enter your **API token** +6. For **Date from**, enter the date in YYYY-MM-DDTHH:mm:ssZ format. The data added on and after this date will be replicated. +7. Enter your **Survey ID**. Zenloop Survey ID. Can be found here. Leave empty to pull answers from all surveys. (Optional) +8. Enter your **Survey Group ID**. Zenloop Survey Group ID. Can be found by pulling All Survey Groups via SurveyGroups stream. Leave empty to pull answers from all survey groups. (Optional) +9. Click **Set up source**. + + +## Supported sync modes + +The Zenloop source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes): + +| Feature | Supported?\(Yes/No\) | +| :---------------- | :------------------- | +| Full Refresh Sync | Yes | +| Incremental Sync | Yes | +| Namespaces | No | + +## Supported Streams This Source is capable of syncing the following core Streams: @@ -16,44 +53,26 @@ This Source is capable of syncing the following core Streams: The `Answers`, `AnswersSurveyGroup` and `Properties` stream respectively have an optional survey_id parameter that can be set by filling the `public_hash_id` field of the connector configuration. If not provided answers for all surveys (groups) will be pulled. -### Data type mapping - -| Integration Type | Airbyte Type | Notes | -| :--------------- | :----------- | :---- | -| `string` | `string` | | -| `integer` | `integer` | | -| `number` | `number` | | -| `array` | `array` | | -| `object` | `object` | | - -### Features - -| Feature | Supported?\(Yes/No\) | Notes | -| :---------------- | :------------------- | :---- | -| Full Refresh Sync | Yes | | -| Incremental Sync | Yes | | -| Namespaces | No | | - -### Performance considerations +## Performance considerations The Zenloop connector should not run into Zenloop API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully. -## Getting started - -### Requirements - -* Zenloop account -* Zenloop API token - -### Setup guide +## Data type map -Please register on Zenloop and retrieve your API token [here](https://app.zenloop.com/settings/api). +| Integration Type | Airbyte Type | +| :--------------- | :----------- | +| `string` | `string` | +| `integer` | `integer` | +| `number` | `number` | +| `array` | `array` | +| `object` | `object` | ## Changelog | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :---------------------------- | -| 0.1.3 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states. | +| 0.1.4 | 2022-11-18 | [19624](https://github.com/airbytehq/airbyte/pull/19624) | Migrate to low code | +| 0.1.3 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states | | 0.1.2 | 2022-08-22 | [15843](https://github.com/airbytehq/airbyte/pull/15843) | Adds Properties stream | | 0.1.1 | 2021-10-26 | [8299](https://github.com/airbytehq/airbyte/pull/8299) | Fix missing seed files | | 0.1.0 | 2021-10-26 | [7380](https://github.com/airbytehq/airbyte/pull/7380) | Initial Release |