From 3981f0f6044c92bad351091f96f1bfc5859e7121 Mon Sep 17 00:00:00 2001 From: Wadii Date: Wed, 12 Jan 2022 15:03:49 +0100 Subject: [PATCH 01/22] Spec json file for PersistIq --- .../connectors/source-persistiq/.dockerignore | 6 + .../connectors/source-persistiq/Dockerfile | 38 ++++ .../connectors/source-persistiq/README.md | 132 +++++++++++ .../acceptance-test-config.yml | 30 +++ .../acceptance-test-docker.sh | 16 ++ .../connectors/source-persistiq/build.gradle | 9 + .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 16 ++ .../integration_tests/catalog.json | 39 ++++ .../integration_tests/configured_catalog.json | 22 ++ .../integration_tests/invalid_config.json | 3 + .../integration_tests/sample_config.json | 3 + .../integration_tests/sample_state.json | 5 + .../connectors/source-persistiq/main.py | 13 ++ .../source-persistiq/requirements.txt | 2 + .../connectors/source-persistiq/setup.py | 29 +++ .../source_persistiq/__init__.py | 8 + .../source_persistiq/schemas/TODO.md | 25 +++ .../source_persistiq/schemas/customers.json | 16 ++ .../source_persistiq/schemas/employees.json | 19 ++ .../source_persistiq/source.py | 206 ++++++++++++++++++ .../source_persistiq/spec.json | 16 ++ .../connectors/source-persistiq/spec.json | 17 ++ .../source-persistiq/unit_tests/__init__.py | 3 + .../unit_tests/test_incremental_streams.py | 59 +++++ .../unit_tests/test_source.py | 22 ++ .../unit_tests/test_streams.py | 83 +++++++ 28 files changed, 845 insertions(+) create mode 100644 airbyte-integrations/connectors/source-persistiq/.dockerignore create mode 100644 airbyte-integrations/connectors/source-persistiq/Dockerfile create mode 100644 airbyte-integrations/connectors/source-persistiq/README.md create mode 100644 airbyte-integrations/connectors/source-persistiq/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-persistiq/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-persistiq/build.gradle create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-persistiq/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-persistiq/main.py create mode 100644 airbyte-integrations/connectors/source-persistiq/requirements.txt create mode 100644 airbyte-integrations/connectors/source-persistiq/setup.py create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/__init__.py create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/TODO.md create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json create mode 100644 airbyte-integrations/connectors/source-persistiq/spec.json create mode 100644 airbyte-integrations/connectors/source-persistiq/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py create mode 100644 airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py diff --git a/airbyte-integrations/connectors/source-persistiq/.dockerignore b/airbyte-integrations/connectors/source-persistiq/.dockerignore new file mode 100644 index 0000000000000..7754a0a84ebcd --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_persistiq +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-persistiq/Dockerfile b/airbyte-integrations/connectors/source-persistiq/Dockerfile new file mode 100644 index 0000000000000..e533a190826d1 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.7.11-alpine3.14 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_persistiq ./source_persistiq + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-persistiq diff --git a/airbyte-integrations/connectors/source-persistiq/README.md b/airbyte-integrations/connectors/source-persistiq/README.md new file mode 100644 index 0000000000000..623a6b78d8798 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/README.md @@ -0,0 +1,132 @@ +# Persistiq Source + +This is the repository for the Persistiq source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/persistiq). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-persistiq:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/persistiq) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_persistiq/spec.json` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source persistiq test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-persistiq:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-persistiq:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-persistiq:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-persistiq:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-persistiq:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-persistiq:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-persistiq:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-persistiq:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-persistiq/acceptance-test-config.yml b/airbyte-integrations/connectors/source-persistiq/acceptance-test-config.yml new file mode 100644 index 0000000000000..44743a58d117d --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/acceptance-test-config.yml @@ -0,0 +1,30 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-persistiq:dev +tests: + spec: + - spec_path: "source_persistiq/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] +# TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file +# expect_records: +# path: "integration_tests/expected_records.txt" +# extra_fields: no +# exact_order: no +# extra_records: yes + incremental: # TODO if your connector does not implement incremental sync, remove this block + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-persistiq/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-persistiq/acceptance-test-docker.sh new file mode 100644 index 0000000000000..c51577d10690c --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-persistiq/build.gradle b/airbyte-integrations/connectors/source-persistiq/build.gradle new file mode 100644 index 0000000000000..a7700be96ab8e --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_persistiq' +} diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/__init__.py b/airbyte-integrations/connectors/source-persistiq/integration_tests/__init__.py new file mode 100644 index 0000000000000..46b7376756ec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..52b0f2c2118f4 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "todo-abnormal-value" + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-persistiq/integration_tests/acceptance.py new file mode 100644 index 0000000000000..056971f954502 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json new file mode 100644 index 0000000000000..6799946a68514 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json @@ -0,0 +1,39 @@ +{ + "streams": [ + { + "name": "TODO fix this file", + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": "column1", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "column1": { + "type": "string" + }, + "column2": { + "type": "number" + } + } + } + }, + { + "name": "table1", + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "column1": { + "type": "string" + }, + "column2": { + "type": "number" + } + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..36f0468db0d8f --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/configured_catalog.json @@ -0,0 +1,22 @@ +{ + "streams": [ + { + "stream": { + "name": "customers", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "employees", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..f3732995784f2 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "todo-wrong-field": "this should be an incomplete config file, used in standard tests" +} diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/sample_config.json new file mode 100644 index 0000000000000..ecc4913b84c74 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/sample_config.json @@ -0,0 +1,3 @@ +{ + "fix-me": "TODO" +} diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/sample_state.json new file mode 100644 index 0000000000000..3587e579822d0 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "value" + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/main.py b/airbyte-integrations/connectors/source-persistiq/main.py new file mode 100644 index 0000000000000..3f871b1110903 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_persistiq import SourcePersistiq + +if __name__ == "__main__": + source = SourcePersistiq() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-persistiq/requirements.txt b/airbyte-integrations/connectors/source-persistiq/requirements.txt new file mode 100644 index 0000000000000..0411042aa0911 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-persistiq/setup.py b/airbyte-integrations/connectors/source-persistiq/setup.py new file mode 100644 index 0000000000000..9777bd59a2099 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/setup.py @@ -0,0 +1,29 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.1", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source_persistiq", + description="Source implementation for Persistiq.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/__init__.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/__init__.py new file mode 100644 index 0000000000000..a3923d95bbe87 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from .source import SourcePersistiq + +__all__ = ["SourcePersistiq"] diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/TODO.md b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/TODO.md new file mode 100644 index 0000000000000..cf1efadb3c9c9 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/TODO.md @@ -0,0 +1,25 @@ +# TODO: Define your stream schemas +Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). + +The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. + +The schema of a stream is the return value of `Stream.get_json_schema`. + +## Static schemas +By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. + +Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. + +## Dynamic schemas +If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). + +## Dynamically modifying static schemas +Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: +``` +def get_json_schema(self): + schema = super().get_json_schema() + schema['dynamically_determined_property'] = "property" + return schema +``` + +Delete this file once you're done. Or don't. Up to you :) diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json new file mode 100644 index 0000000000000..9a4b134858363 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json @@ -0,0 +1,16 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "signup_date": { + "type": ["null", "string"], + "format": "date-time" + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json new file mode 100644 index 0000000000000..2fa01a0fa1ff9 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json @@ -0,0 +1,19 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "years_of_service": { + "type": ["null", "integer"] + }, + "start_date": { + "type": ["null", "string"], + "format": "date-time" + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py new file mode 100644 index 0000000000000..210d27301b37e --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py @@ -0,0 +1,206 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple + +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator + +""" +TODO: Most comments in this class are instructive and should be deleted after the source is implemented. + +This file provides a stubbed example of how to use the Airbyte CDK to develop both a source connector which supports full refresh or and an +incremental syncs from an HTTP API. + +The various TODOs are both implementation hints and steps - fulfilling all the TODOs should be sufficient to implement one basic and one incremental +stream from a source. This pattern is the same one used by Airbyte internally to implement connectors. + +The approach here is not authoritative, and devs are free to use their own judgement. + +There are additional required TODOs in the files within the integration_tests folder and the spec.json file. +""" + + +# Basic full refresh stream +class PersistiqStream(HttpStream, ABC): + """ + TODO remove this comment + + This class represents a stream output by the connector. + This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy, + parsing responses etc.. + + Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream. + + Typically for REST APIs each stream corresponds to a resource in the API. For example if the API + contains the endpoints + - GET v1/customers + - GET v1/employees + + then you should have three classes: + `class PersistiqStream(HttpStream, ABC)` which is the current class + `class Customers(PersistiqStream)` contains behavior to pull data for customers using v1/customers + `class Employees(PersistiqStream)` contains behavior to pull data for employees using v1/employees + + If some streams implement incremental sync, it is typical to create another class + `class IncrementalPersistiqStream((PersistiqStream), ABC)` then have concrete stream implementations extend it. An example + is provided below. + + See the reference docs for the full list of configurable options. + """ + + # TODO: Fill in the url base. Required. + url_base = "https://example-api.com/v1/" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None. + + This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed + to most other methods in this class to help you form headers, request bodies, query params, etc.. + + For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a + 'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1. + The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page']. + + :param response: the most recent response from the API + :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. + If there are no more pages in the result, return None. + """ + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + """ + TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params. + Usually contains common params e.g. pagination size etc. + """ + return {} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + TODO: Override this method to define how a response is parsed. + :return an iterable containing each record in the response + """ + yield {} + + +class Customers(PersistiqStream): + """ + TODO: Change class name to match the table/data source this stream corresponds to. + """ + + # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. + primary_key = "customer_id" + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + """ + TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/customers then this + should return "customers". Required. + """ + return "customers" + + +# Basic incremental stream +class IncrementalPersistiqStream(PersistiqStream, ABC): + """ + TODO fill in details of this class to implement functionality related to incremental syncs for your connector. + if you do not need to implement incremental sync for any streams, remove this class. + """ + + # TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason. + state_checkpoint_interval = None + + @property + def cursor_field(self) -> str: + """ + TODO + Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is + usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental. + + :return str: The name of the cursor field. + """ + return [] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and + the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. + """ + return {} + + +class Employees(IncrementalPersistiqStream): + """ + TODO: Change class name to match the table/data source this stream corresponds to. + """ + + # TODO: Fill in the cursor_field. Required. + cursor_field = "start_date" + + # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. + primary_key = "employee_id" + + def path(self, **kwargs) -> str: + """ + TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should + return "single". Required. + """ + return "employees" + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + """ + TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method. + + Slices control when state is saved. Specifically, state is saved after a slice has been fully read. + This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts" + section of the docs for more information. + + The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the + necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created. + This means that data in a slice is usually closely related to a stream's cursor_field and stream_state. + + An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help + craft that specific request. + + For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement + this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date + till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into + the date query param. + """ + raise NotImplementedError("Implement stream slices or delete this method!") + + +# Source +class SourcePersistiq(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API + + See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 + for an example. + + :param config: the user-input config object conforming to the connector's spec.json + :param logger: logger object + :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. + """ + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + """ + TODO: Replace the streams below with your own streams. + + :param config: A Mapping of the user input configuration as defined in the connector spec. + """ + # TODO remove the authenticator if not required. + auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support + return [Customers(authenticator=auth), Employees(authenticator=auth)] diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json new file mode 100644 index 0000000000000..9a0a8657e3413 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json @@ -0,0 +1,16 @@ +{ + "documentationUrl": "https://docsurl.com", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Persistiq Spec", + "type": "object", + "required": ["TODO"], + "additionalProperties": false, + "properties": { + "TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": { + "type": "string", + "description": "describe me" + } + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/spec.json b/airbyte-integrations/connectors/source-persistiq/spec.json new file mode 100644 index 0000000000000..a7ae872372110 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/spec.json @@ -0,0 +1,17 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/persistiq", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "PersistIq spec", + "type": "object", + "required": ["api_key", "base"], + "additionalProperties": true, + "properties": { + "api_key": { + "type": "string", + "description": "PersistIq API Key. See the docs for more information on where to find that key.", + "airbyte_secret": true + } + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/__init__.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/__init__.py new file mode 100644 index 0000000000000..46b7376756ec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py new file mode 100644 index 0000000000000..bb73133ef1df1 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.models import SyncMode +from pytest import fixture +from source_persistiq.source import IncrementalPersistiqStream + + +@fixture +def patch_incremental_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(IncrementalPersistiqStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalPersistiqStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalPersistiqStream, "__abstractmethods__", set()) + + +def test_cursor_field(patch_incremental_base_class): + stream = IncrementalPersistiqStream() + # TODO: replace this with your expected cursor field + expected_cursor_field = [] + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_base_class): + stream = IncrementalPersistiqStream() + # TODO: replace this with your input parameters + inputs = {"current_stream_state": None, "latest_record": None} + # TODO: replace this with your expected updated stream state + expected_state = {} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_stream_slices(patch_incremental_base_class): + stream = IncrementalPersistiqStream() + # TODO: replace this with your input parameters + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + # TODO: replace this with your expected stream slices list + expected_stream_slice = [None] + assert stream.stream_slices(**inputs) == expected_stream_slice + + +def test_supports_incremental(patch_incremental_base_class, mocker): + mocker.patch.object(IncrementalPersistiqStream, "cursor_field", "dummy_field") + stream = IncrementalPersistiqStream() + assert stream.supports_incremental + + +def test_source_defined_cursor(patch_incremental_base_class): + stream = IncrementalPersistiqStream() + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(patch_incremental_base_class): + stream = IncrementalPersistiqStream() + # TODO: replace this with your expected checkpoint interval + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py new file mode 100644 index 0000000000000..f4c4d02840b8c --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from source_persistiq.source import SourcePersistiq + + +def test_check_connection(mocker): + source = SourcePersistiq() + logger_mock, config_mock = MagicMock(), MagicMock() + assert source.check_connection(logger_mock, config_mock) == (True, None) + + +def test_streams(mocker): + source = SourcePersistiq() + config_mock = MagicMock() + streams = source.streams(config_mock) + # TODO: replace this with your streams number + expected_streams_number = 2 + assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py new file mode 100644 index 0000000000000..8fc9c9549fdfe --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py @@ -0,0 +1,83 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from source_persistiq.source import PersistiqStream + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(PersistiqStream, "path", "v0/example_endpoint") + mocker.patch.object(PersistiqStream, "primary_key", "test_primary_key") + mocker.patch.object(PersistiqStream, "__abstractmethods__", set()) + + +def test_request_params(patch_base_class): + stream = PersistiqStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request parameters + expected_params = {} + assert stream.request_params(**inputs) == expected_params + + +def test_next_page_token(patch_base_class): + stream = PersistiqStream() + # TODO: replace this with your input parameters + inputs = {"response": MagicMock()} + # TODO: replace this with your expected next page token + expected_token = None + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(patch_base_class): + stream = PersistiqStream() + # TODO: replace this with your input parameters + inputs = {"response": MagicMock()} + # TODO: replace this with your expected parced object + expected_parsed_object = {} + assert next(stream.parse_response(**inputs)) == expected_parsed_object + + +def test_request_headers(patch_base_class): + stream = PersistiqStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request headers + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = PersistiqStream() + # TODO: replace this with your expected http request method + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = PersistiqStream() + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = PersistiqStream() + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time From 71ce269da62d0638653ab087c7ac05076c93c96f Mon Sep 17 00:00:00 2001 From: Wadii Date: Wed, 12 Jan 2022 16:03:53 +0100 Subject: [PATCH 02/22] Implement check_connection for persistiq --- .../source_persistiq/schemas/customers.json | 16 ------- .../source_persistiq/source.py | 44 ++++++++----------- .../source_persistiq/spec.json | 9 ++-- .../connectors/source-persistiq/spec.json | 2 +- 4 files changed, 25 insertions(+), 46 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json deleted file mode 100644 index 9a4b134858363..0000000000000 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/customers.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "signup_date": { - "type": ["null", "string"], - "format": "date-time" - } - } -} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py index 210d27301b37e..7c628604085fa 100644 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py @@ -55,8 +55,7 @@ class PersistiqStream(HttpStream, ABC): See the reference docs for the full list of configurable options. """ - # TODO: Fill in the url base. Required. - url_base = "https://example-api.com/v1/" + url_base = "https://api.persistiq.com/v1/" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ @@ -89,25 +88,18 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp TODO: Override this method to define how a response is parsed. :return an iterable containing each record in the response """ + print('parsing') + print(response) yield {} -class Customers(PersistiqStream): - """ - TODO: Change class name to match the table/data source this stream corresponds to. - """ - - # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. - primary_key = "customer_id" +class Users(PersistiqStream): + primary_key = "id" def path( self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: - """ - TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/customers then this - should return "customers". Required. - """ - return "customers" + return "users" # Basic incremental stream @@ -177,23 +169,23 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into the date query param. """ - raise NotImplementedError("Implement stream slices or delete this method!") + raise NotImplementedError( + "Implement stream slices or delete this method!") # Source class SourcePersistiq(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: - """ - TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API - - See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 - for an example. - - :param config: the user-input config object conforming to the connector's spec.json - :param logger: logger object - :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. - """ - return True, None + headers = { + "x-api-key": config['api_key'] + } + url = "https://api.persistiq.com/v1/users" + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + return True, None + except requests.exceptions.RequestException as e: + return False, e def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json index 9a0a8657e3413..5d675f02ea7b1 100644 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/spec.json @@ -4,12 +4,15 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Persistiq Spec", "type": "object", - "required": ["TODO"], + "required": [ + "api_key" + ], "additionalProperties": false, "properties": { - "TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": { + "api_key": { "type": "string", - "description": "describe me" + "description": "PersistIq API Key. See the docs for more information on where to find that key.", + "airbyte_secret": true } } } diff --git a/airbyte-integrations/connectors/source-persistiq/spec.json b/airbyte-integrations/connectors/source-persistiq/spec.json index a7ae872372110..df8f34c74aed8 100644 --- a/airbyte-integrations/connectors/source-persistiq/spec.json +++ b/airbyte-integrations/connectors/source-persistiq/spec.json @@ -4,7 +4,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "PersistIq spec", "type": "object", - "required": ["api_key", "base"], + "required": ["api_key"], "additionalProperties": true, "properties": { "api_key": { From 8ccafccec3b9b30534ddae294634145c62f79095 Mon Sep 17 00:00:00 2001 From: Wadii Date: Wed, 12 Jan 2022 16:59:30 +0100 Subject: [PATCH 03/22] Read Users data from persistiq --- .../source_persistiq/schemas/users.json | 25 ++++ .../source_persistiq/source.py | 136 +++++++----------- 2 files changed, 76 insertions(+), 85 deletions(-) create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/users.json diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/users.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/users.json new file mode 100644 index 0000000000000..946b96503eec2 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/users.json @@ -0,0 +1,25 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "email": { + "type": ["null", "string"], + "format": "email" + }, + "activated": { + "type": ["null", "boolean"] + }, + "default_mailbox_id": { + "type": ["null", "string"] + }, + "salesforce_id": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py index 7c628604085fa..944f6cd7fa039 100644 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py @@ -10,7 +10,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import NoAuth """ TODO: Most comments in this class are instructive and should be deleted after the source is implemented. @@ -29,67 +29,23 @@ # Basic full refresh stream class PersistiqStream(HttpStream, ABC): - """ - TODO remove this comment - - This class represents a stream output by the connector. - This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy, - parsing responses etc.. - - Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream. - - Typically for REST APIs each stream corresponds to a resource in the API. For example if the API - contains the endpoints - - GET v1/customers - - GET v1/employees - - then you should have three classes: - `class PersistiqStream(HttpStream, ABC)` which is the current class - `class Customers(PersistiqStream)` contains behavior to pull data for customers using v1/customers - `class Employees(PersistiqStream)` contains behavior to pull data for employees using v1/employees - - If some streams implement incremental sync, it is typical to create another class - `class IncrementalPersistiqStream((PersistiqStream), ABC)` then have concrete stream implementations extend it. An example - is provided below. - - See the reference docs for the full list of configurable options. - """ + def __init__( + self, api_key: str, **kwargs + ): + super().__init__(**kwargs) + self.api_key = api_key url_base = "https://api.persistiq.com/v1/" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None. - - This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed - to most other methods in this class to help you form headers, request bodies, query params, etc.. - - For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a - 'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1. - The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page']. - - :param response: the most recent response from the API - :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. - If there are no more pages in the result, return None. - """ return None def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - """ - TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params. - Usually contains common params e.g. pagination size etc. - """ return {} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """ - TODO: Override this method to define how a response is parsed. - :return an iterable containing each record in the response - """ - print('parsing') - print(response) yield {} @@ -101,6 +57,15 @@ def path( ) -> str: return "users" + def request_headers( + self, + **kwargs + ) -> MutableMapping[str, Any]: + return {'x-api-key': self.api_key} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response.json() + # Basic incremental stream class IncrementalPersistiqStream(PersistiqStream, ABC): @@ -131,46 +96,46 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {} -class Employees(IncrementalPersistiqStream): - """ - TODO: Change class name to match the table/data source this stream corresponds to. - """ +# class Employees(IncrementalPersistiqStream): +# """ +# TODO: Change class name to match the table/data source this stream corresponds to. +# """ - # TODO: Fill in the cursor_field. Required. - cursor_field = "start_date" +# # TODO: Fill in the cursor_field. Required. +# cursor_field = "start_date" - # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. - primary_key = "employee_id" +# # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. +# primary_key = "employee_id" - def path(self, **kwargs) -> str: - """ - TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should - return "single". Required. - """ - return "employees" +# def path(self, **kwargs) -> str: +# """ +# TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should +# return "single". Required. +# """ +# return "employees" - def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: - """ - TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method. +# def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: +# """ +# TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method. - Slices control when state is saved. Specifically, state is saved after a slice has been fully read. - This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts" - section of the docs for more information. +# Slices control when state is saved. Specifically, state is saved after a slice has been fully read. +# This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts" +# section of the docs for more information. - The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the - necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created. - This means that data in a slice is usually closely related to a stream's cursor_field and stream_state. +# The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the +# necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created. +# This means that data in a slice is usually closely related to a stream's cursor_field and stream_state. - An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help - craft that specific request. +# An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help +# craft that specific request. - For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement - this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date - till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into - the date query param. - """ - raise NotImplementedError( - "Implement stream slices or delete this method!") +# For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement +# this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date +# till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into +# the date query param. +# """ +# raise NotImplementedError( +# "Implement stream slices or delete this method!") # Source @@ -193,6 +158,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: :param config: A Mapping of the user input configuration as defined in the connector spec. """ - # TODO remove the authenticator if not required. - auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support - return [Customers(authenticator=auth), Employees(authenticator=auth)] + + # auth = {"x-api-key": config['api_key']} + auth = NoAuth() + return [Users(authenticator=auth, api_key=config['api_key'])] From 2a3ffd528dbaa82d7335f2c1690e05494243963b Mon Sep 17 00:00:00 2001 From: Wadii Date: Thu, 13 Jan 2022 14:36:44 +0100 Subject: [PATCH 04/22] Read Leads data from persistiq and pagination --- .../source_persistiq/schemas/employees.json | 19 -- .../source_persistiq/schemas/leads.json | 177 ++++++++++++++++++ .../source_persistiq/source.py | 52 ++++- 3 files changed, 228 insertions(+), 20 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/leads.json diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json deleted file mode 100644 index 2fa01a0fa1ff9..0000000000000 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/employees.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "years_of_service": { - "type": ["null", "integer"] - }, - "start_date": { - "type": ["null", "string"], - "format": "date-time" - } - } -} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/leads.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/leads.json new file mode 100644 index 0000000000000..d4aef8cec6c6b --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/leads.json @@ -0,0 +1,177 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "bounced": { + "type": [ + "null", + "boolean" + ] + }, + "owner_id": { + "type": [ + "null", + "string" + ] + }, + "optedout": { + "type": [ + "null", + "boolean" + ] + }, + "sent_count": { + "type": [ + "null", + "integer" + ] + }, + "replied_count": { + "type": [ + "null", + "integer" + ] + }, + "last_sent_at": { + "type": [ + "null", + "string" + ] + }, + "status": { + "type": [ + "null", + "string" + ] + }, + "data": { + "company_name": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ], + "format": "email" + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "address": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "state": { + "type": [ + "null", + "string" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "industry": { + "type": [ + "null", + "string" + ] + }, + "snippet": { + "type": [ + "null", + "string" + ] + }, + "snippet1": { + "type": [ + "null", + "string" + ] + }, + "snippet2": { + "type": [ + "null", + "string" + ] + }, + "snippet3": { + "type": [ + "null", + "string" + ] + }, + "snippet4": { + "type": [ + "null", + "string" + ] + }, + "twitch_name": { + "type": [ + "null", + "string" + ] + }, + "linkedin": { + "type": [ + "null", + "string" + ] + }, + "twitter": { + "type": [ + "null", + "string" + ] + }, + "facebook": { + "type": [ + "null", + "string" + ] + }, + "salesforce_id": { + "type": [ + "null", + "string" + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py index 944f6cd7fa039..3c2404a7111e6 100644 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py @@ -52,6 +52,22 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class Users(PersistiqStream): primary_key = "id" + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + json_response = response.json() + if not json_response.get("has_more", False): + return None + + return { + "page": json_response.get("next_page")[-1] + } + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return { + "page": 1 if not next_page_token else next_page_token["page"] + } + def path( self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: @@ -67,6 +83,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield response.json() +class Leads(PersistiqStream): + primary_key = "id" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + json_response = response.json() + if not json_response.get("has_more", False): + return None + + return { + "page": json_response.get("next_page")[-1] + } + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return { + "page": 1 if not next_page_token else next_page_token["page"] + } + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "leads" + + def request_headers( + self, + **kwargs + ) -> MutableMapping[str, Any]: + return {'x-api-key': self.api_key} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response.json() + + # Basic incremental stream class IncrementalPersistiqStream(PersistiqStream, ABC): """ @@ -161,4 +211,4 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # auth = {"x-api-key": config['api_key']} auth = NoAuth() - return [Users(authenticator=auth, api_key=config['api_key'])] + return [Users(authenticator=auth, api_key=config['api_key']), Leads(authenticator=auth, api_key=config['api_key'])] From 82cb3a17a61ae16671d0a915db71bc7230d200fe Mon Sep 17 00:00:00 2001 From: Wadii Date: Thu, 13 Jan 2022 14:51:43 +0100 Subject: [PATCH 05/22] Read Campaigns data from persistiq --- .../source_persistiq/schemas/campaigns.json | 81 +++++++++++++++++++ .../source_persistiq/source.py | 79 +++++++++--------- 2 files changed, 117 insertions(+), 43 deletions(-) create mode 100644 airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/campaigns.json diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/campaigns.json b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/campaigns.json new file mode 100644 index 0000000000000..a554a251f3ca6 --- /dev/null +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/schemas/campaigns.json @@ -0,0 +1,81 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "creator":{"id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ] + } + }, + "stats": { + "prospects_contacted": { + "type": [ + "null", + "integer" + ] + }, + "prospects_reached": { + "type": [ + "null", + "integer" + ] + }, + "prospects_opened": { + "type": [ + "null", + "integer" + ] + }, + "prospects_replied": { + "type": [ + "null", + "integer" + ] + }, + "prospects_bounced": { + "type": [ + "null", + "integer" + ] + }, + "prospects_optedout": { + "type": [ + "null", + "integer" + ] + }, + "total_contacted": { + "type": [ + "null", + "integer" + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py index 3c2404a7111e6..2af42aff11249 100644 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py @@ -117,6 +117,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield response.json() +class Campaigns(PersistiqStream): + primary_key = "id" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + json_response = response.json() + if not json_response.get("has_more", False): + return None + + return { + "page": json_response.get("next_page")[-1] + } + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return { + "page": 1 if not next_page_token else next_page_token["page"] + } + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "campaigns" + + def request_headers( + self, + **kwargs + ) -> MutableMapping[str, Any]: + return {'x-api-key': self.api_key} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response.json() + + # Basic incremental stream class IncrementalPersistiqStream(PersistiqStream, ABC): """ @@ -145,50 +179,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late """ return {} - -# class Employees(IncrementalPersistiqStream): -# """ -# TODO: Change class name to match the table/data source this stream corresponds to. -# """ - -# # TODO: Fill in the cursor_field. Required. -# cursor_field = "start_date" - -# # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. -# primary_key = "employee_id" - -# def path(self, **kwargs) -> str: -# """ -# TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should -# return "single". Required. -# """ -# return "employees" - -# def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: -# """ -# TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method. - -# Slices control when state is saved. Specifically, state is saved after a slice has been fully read. -# This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts" -# section of the docs for more information. - -# The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the -# necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created. -# This means that data in a slice is usually closely related to a stream's cursor_field and stream_state. - -# An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help -# craft that specific request. - -# For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement -# this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date -# till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into -# the date query param. -# """ -# raise NotImplementedError( -# "Implement stream slices or delete this method!") +# Source -# Source class SourcePersistiq(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: headers = { @@ -211,4 +204,4 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # auth = {"x-api-key": config['api_key']} auth = NoAuth() - return [Users(authenticator=auth, api_key=config['api_key']), Leads(authenticator=auth, api_key=config['api_key'])] + return [Users(authenticator=auth, api_key=config['api_key']), Leads(authenticator=auth, api_key=config['api_key']), Campaigns(authenticator=auth, api_key=config['api_key'])] From 944c2363a14a8526aa4fae89a6fb9337e8698387 Mon Sep 17 00:00:00 2001 From: Wadii Date: Thu, 13 Jan 2022 14:55:31 +0100 Subject: [PATCH 06/22] Add persistiq logo --- .../init/src/main/resources/icons/persistiq.svg | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 airbyte-config/init/src/main/resources/icons/persistiq.svg diff --git a/airbyte-config/init/src/main/resources/icons/persistiq.svg b/airbyte-config/init/src/main/resources/icons/persistiq.svg new file mode 100644 index 0000000000000..d229d7b29a1d3 --- /dev/null +++ b/airbyte-config/init/src/main/resources/icons/persistiq.svg @@ -0,0 +1,8 @@ + + + + + + From 13dc51c96ddc9b55b3dae8cfef719a9146998fc7 Mon Sep 17 00:00:00 2001 From: Wadii Date: Thu, 13 Jan 2022 16:57:57 +0100 Subject: [PATCH 07/22] Completed unit tests --- .../integration_tests/catalog.json | 384 ++++++++++++++++-- .../source_persistiq/source.py | 147 ++----- .../unit_tests/test_incremental_streams.py | 59 --- .../unit_tests/test_source.py | 23 +- .../unit_tests/test_streams.py | 82 ++-- 5 files changed, 424 insertions(+), 271 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py diff --git a/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json b/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json index 6799946a68514..8454bbb8bff71 100644 --- a/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json +++ b/airbyte-integrations/connectors/source-persistiq/integration_tests/catalog.json @@ -1,39 +1,351 @@ { - "streams": [ - { - "name": "TODO fix this file", - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": "column1", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "column1": { - "type": "string" - }, - "column2": { - "type": "number" - } + "streams": [ + { + "stream": { + "name": "leads", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "bounced": { + "type": [ + "null", + "boolean" + ] + }, + "owner_id": { + "type": [ + "null", + "string" + ] + }, + "optedout": { + "type": [ + "null", + "boolean" + ] + }, + "sent_count": { + "type": [ + "null", + "integer" + ] + }, + "replied_count": { + "type": [ + "null", + "integer" + ] + }, + "last_sent_at": { + "type": [ + "null", + "string" + ] + }, + "status": { + "type": [ + "null", + "string" + ] + }, + "data": { + "company_name": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ], + "format": "email" + }, + "first_name": { + "type": [ + "null", + "string" + ] + }, + "last_name": { + "type": [ + "null", + "string" + ] + }, + "address": { + "type": [ + "null", + "string" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "state": { + "type": [ + "null", + "string" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "industry": { + "type": [ + "null", + "string" + ] + }, + "snippet": { + "type": [ + "null", + "string" + ] + }, + "snippet1": { + "type": [ + "null", + "string" + ] + }, + "snippet2": { + "type": [ + "null", + "string" + ] + }, + "snippet3": { + "type": [ + "null", + "string" + ] + }, + "snippet4": { + "type": [ + "null", + "string" + ] + }, + "twitch_name": { + "type": [ + "null", + "string" + ] + }, + "linkedin": { + "type": [ + "null", + "string" + ] + }, + "twitter": { + "type": [ + "null", + "string" + ] + }, + "facebook": { + "type": [ + "null", + "string" + ] + }, + "salesforce_id": { + "type": [ + "null", + "string" + ] + } + } + } + } + }, + "supported_sync_modes": [ + "full_refresh" + ], + "source_defined_primary_key": [ + [ + "id" + ] + ], + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "campaigns", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "creator": { + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ] + } + }, + "stats": { + "prospects_contacted": { + "type": [ + "null", + "integer" + ] + }, + "prospects_reached": { + "type": [ + "null", + "integer" + ] + }, + "prospects_opened": { + "type": [ + "null", + "integer" + ] + }, + "prospects_replied": { + "type": [ + "null", + "integer" + ] + }, + "prospects_bounced": { + "type": [ + "null", + "integer" + ] + }, + "prospects_optedout": { + "type": [ + "null", + "integer" + ] + }, + "total_contacted": { + "type": [ + "null", + "integer" + ] + } + } + } + } + }, + "supported_sync_modes": [ + "full_refresh" + ], + "source_defined_primary_key": [ + [ + "id" + ] + ], + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "users", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ], + "format": "email" + }, + "activated": { + "type": [ + "null", + "boolean" + ] + }, + "default_mailbox_id": { + "type": [ + "null", + "string" + ] + }, + "salesforce_id": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "supported_sync_modes": [ + "full_refresh" + ], + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + "source_defined_primary_key": [ + [ + "id" + ] + ] } - } - }, - { - "name": "table1", - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": false, - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "column1": { - "type": "string" - }, - "column2": { - "type": "number" - } - } - } - } - ] + ] } diff --git a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py index 2af42aff11249..78a6250caaf2c 100644 --- a/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py +++ b/airbyte-integrations/connectors/source-persistiq/source_persistiq/source.py @@ -29,164 +29,66 @@ # Basic full refresh stream class PersistiqStream(HttpStream, ABC): - def __init__( - self, api_key: str, **kwargs - ): + def __init__(self, api_key: str, **kwargs): super().__init__(**kwargs) self.api_key = api_key url_base = "https://api.persistiq.com/v1/" - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - return {} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - yield {} - - -class Users(PersistiqStream): - primary_key = "id" - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: json_response = response.json() if not json_response.get("has_more", False): return None - return { - "page": json_response.get("next_page")[-1] - } + return {"page": json_response.get("next_page")[-1]} def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - return { - "page": 1 if not next_page_token else next_page_token["page"] - } + return {"page": 1 if not next_page_token else next_page_token["page"]} - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - return "users" + def request_headers(self, **kwargs) -> MutableMapping[str, Any]: + return {"x-api-key": self.api_key} - def request_headers( - self, - **kwargs - ) -> MutableMapping[str, Any]: - return {'x-api-key': self.api_key} + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield {} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: yield response.json() -class Leads(PersistiqStream): +class Users(PersistiqStream): primary_key = "id" - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - json_response = response.json() - if not json_response.get("has_more", False): - return None + def path( + self + ) -> str: + return "users" - return { - "page": json_response.get("next_page")[-1] - } - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - return { - "page": 1 if not next_page_token else next_page_token["page"] - } +class Leads(PersistiqStream): + primary_key = "id" def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self ) -> str: return "leads" - def request_headers( - self, - **kwargs - ) -> MutableMapping[str, Any]: - return {'x-api-key': self.api_key} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - yield response.json() - class Campaigns(PersistiqStream): primary_key = "id" - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - json_response = response.json() - if not json_response.get("has_more", False): - return None - - return { - "page": json_response.get("next_page")[-1] - } - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - return { - "page": 1 if not next_page_token else next_page_token["page"] - } - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self ) -> str: return "campaigns" - def request_headers( - self, - **kwargs - ) -> MutableMapping[str, Any]: - return {'x-api-key': self.api_key} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - yield response.json() - - -# Basic incremental stream -class IncrementalPersistiqStream(PersistiqStream, ABC): - """ - TODO fill in details of this class to implement functionality related to incremental syncs for your connector. - if you do not need to implement incremental sync for any streams, remove this class. - """ - - # TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason. - state_checkpoint_interval = None - - @property - def cursor_field(self) -> str: - """ - TODO - Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is - usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental. - - :return str: The name of the cursor field. - """ - return [] - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and - the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. - """ - return {} - # Source class SourcePersistiq(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: - headers = { - "x-api-key": config['api_key'] - } + headers = {"x-api-key": config["api_key"]} url = "https://api.persistiq.com/v1/users" try: response = requests.get(url, headers=headers) @@ -196,12 +98,9 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: return False, e def streams(self, config: Mapping[str, Any]) -> List[Stream]: - """ - TODO: Replace the streams below with your own streams. - - :param config: A Mapping of the user input configuration as defined in the connector spec. - """ - - # auth = {"x-api-key": config['api_key']} auth = NoAuth() - return [Users(authenticator=auth, api_key=config['api_key']), Leads(authenticator=auth, api_key=config['api_key']), Campaigns(authenticator=auth, api_key=config['api_key'])] + return [ + Users(authenticator=auth, api_key=config["api_key"]), + Leads(authenticator=auth, api_key=config["api_key"]), + Campaigns(authenticator=auth, api_key=config["api_key"]), + ] diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py deleted file mode 100644 index bb73133ef1df1..0000000000000 --- a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_incremental_streams.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - - -from airbyte_cdk.models import SyncMode -from pytest import fixture -from source_persistiq.source import IncrementalPersistiqStream - - -@fixture -def patch_incremental_base_class(mocker): - # Mock abstract methods to enable instantiating abstract class - mocker.patch.object(IncrementalPersistiqStream, "path", "v0/example_endpoint") - mocker.patch.object(IncrementalPersistiqStream, "primary_key", "test_primary_key") - mocker.patch.object(IncrementalPersistiqStream, "__abstractmethods__", set()) - - -def test_cursor_field(patch_incremental_base_class): - stream = IncrementalPersistiqStream() - # TODO: replace this with your expected cursor field - expected_cursor_field = [] - assert stream.cursor_field == expected_cursor_field - - -def test_get_updated_state(patch_incremental_base_class): - stream = IncrementalPersistiqStream() - # TODO: replace this with your input parameters - inputs = {"current_stream_state": None, "latest_record": None} - # TODO: replace this with your expected updated stream state - expected_state = {} - assert stream.get_updated_state(**inputs) == expected_state - - -def test_stream_slices(patch_incremental_base_class): - stream = IncrementalPersistiqStream() - # TODO: replace this with your input parameters - inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} - # TODO: replace this with your expected stream slices list - expected_stream_slice = [None] - assert stream.stream_slices(**inputs) == expected_stream_slice - - -def test_supports_incremental(patch_incremental_base_class, mocker): - mocker.patch.object(IncrementalPersistiqStream, "cursor_field", "dummy_field") - stream = IncrementalPersistiqStream() - assert stream.supports_incremental - - -def test_source_defined_cursor(patch_incremental_base_class): - stream = IncrementalPersistiqStream() - assert stream.source_defined_cursor - - -def test_stream_checkpoint_interval(patch_incremental_base_class): - stream = IncrementalPersistiqStream() - # TODO: replace this with your expected checkpoint interval - expected_checkpoint_interval = None - assert stream.state_checkpoint_interval == expected_checkpoint_interval diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py index f4c4d02840b8c..6bd75bc060663 100644 --- a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_source.py @@ -2,21 +2,34 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock from source_persistiq.source import SourcePersistiq -def test_check_connection(mocker): +def test_check_connection(mocker, requests_mock): source = SourcePersistiq() - logger_mock, config_mock = MagicMock(), MagicMock() + logger_mock = MagicMock(), MagicMock() + config_mock = {"api_key": "mybeautifulkey"} + # success + requests_mock.get("https://api.persistiq.com/v1/users", json={ + "id": "u_3an2Jp", + "name": "Gabriel Rossmann", + "email": "gabriel@punctual.cc", + "activated": "true", + "default_mailbox_id": "mbox_38ymEp", + "salesforce_id": "", + }) assert source.check_connection(logger_mock, config_mock) == (True, None) + # failure + requests_mock.get("https://api.persistiq.com/v1/users", status_code=500) + assert source.check_connection(logger_mock, config_mock) == (False, ANY) + def test_streams(mocker): source = SourcePersistiq() config_mock = MagicMock() streams = source.streams(config_mock) - # TODO: replace this with your streams number - expected_streams_number = 2 + expected_streams_number = 3 assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py index 8fc9c9549fdfe..abad97ab0a071 100644 --- a/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-persistiq/unit_tests/test_streams.py @@ -9,6 +9,17 @@ from source_persistiq.source import PersistiqStream +def mocked_requests_get(*args, **kwargs): + class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + return MockResponse(json_data=kwargs["json_data"], status_code=kwargs["status_code"]) + + @pytest.fixture def patch_base_class(mocker): # Mock abstract methods to enable instantiating abstract class @@ -18,66 +29,43 @@ def patch_base_class(mocker): def test_request_params(patch_base_class): - stream = PersistiqStream() - # TODO: replace this with your input parameters - inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} - # TODO: replace this with your expected request parameters - expected_params = {} + stream = PersistiqStream(api_key="mybeautifulkey") + inputs = {"next_page_token": {"page": 1}} + expected_params = {"page": 1} assert stream.request_params(**inputs) == expected_params def test_next_page_token(patch_base_class): - stream = PersistiqStream() - # TODO: replace this with your input parameters - inputs = {"response": MagicMock()} - # TODO: replace this with your expected next page token + stream = PersistiqStream(api_key="mybeautifulkey") + # With next page + response = mocked_requests_get(json_data={ + "has_more": True, "next_page": "https://api.persistiq.com/v1/users?page=2"}, status_code=200) + expected_token = "2" + assert stream.next_page_token(response=response) == { + "page": expected_token} + # Without next page + response = mocked_requests_get(json_data={}, status_code=200) expected_token = None - assert stream.next_page_token(**inputs) == expected_token + assert stream.next_page_token(response=response) == expected_token def test_parse_response(patch_base_class): - stream = PersistiqStream() - # TODO: replace this with your input parameters - inputs = {"response": MagicMock()} - # TODO: replace this with your expected parced object - expected_parsed_object = {} - assert next(stream.parse_response(**inputs)) == expected_parsed_object + stream = PersistiqStream(api_key="mybeautifulkey") + response = mocked_requests_get(json_data={ + "users": [{"id": 1, "name": "John Doe"}]}, status_code=200) + expected_parsed_object = { + "users": [{"id": 1, "name": "John Doe"}]} + assert next(stream.parse_response(response=response) + ) == expected_parsed_object def test_request_headers(patch_base_class): - stream = PersistiqStream() - # TODO: replace this with your input parameters - inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} - # TODO: replace this with your expected request headers - expected_headers = {} - assert stream.request_headers(**inputs) == expected_headers + stream = PersistiqStream(api_key="mybeautifulkey") + expected_headers = {"x-api-key": "mybeautifulkey"} + assert stream.request_headers() == expected_headers def test_http_method(patch_base_class): - stream = PersistiqStream() - # TODO: replace this with your expected http request method + stream = PersistiqStream(api_key="mybeautifulkey") expected_method = "GET" assert stream.http_method == expected_method - - -@pytest.mark.parametrize( - ("http_status", "should_retry"), - [ - (HTTPStatus.OK, False), - (HTTPStatus.BAD_REQUEST, False), - (HTTPStatus.TOO_MANY_REQUESTS, True), - (HTTPStatus.INTERNAL_SERVER_ERROR, True), - ], -) -def test_should_retry(patch_base_class, http_status, should_retry): - response_mock = MagicMock() - response_mock.status_code = http_status - stream = PersistiqStream() - assert stream.should_retry(response_mock) == should_retry - - -def test_backoff_time(patch_base_class): - response_mock = MagicMock() - stream = PersistiqStream() - expected_backoff_time = None - assert stream.backoff_time(response_mock) == expected_backoff_time From ddc306c02cf909a30c8475342f617fa1ce506c78 Mon Sep 17 00:00:00 2001 From: Wadii Date: Thu, 13 Jan 2022 18:28:18 +0100 Subject: [PATCH 08/22] Source definitions for PersistIq --- .../init/src/main/resources/icons/persistiq.svg | 6 +++--- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 7 +++++++ .../connectors/source-persistiq/README.md | 1 + .../source-persistiq/source_persistiq/source.py | 8 ++++---- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/airbyte-config/init/src/main/resources/icons/persistiq.svg b/airbyte-config/init/src/main/resources/icons/persistiq.svg index d229d7b29a1d3..e10a9374a771b 100644 --- a/airbyte-config/init/src/main/resources/icons/persistiq.svg +++ b/airbyte-config/init/src/main/resources/icons/persistiq.svg @@ -1,7 +1,7 @@ - - -