From 7ca371b26d16106310d4ef400e3a9175dc07ff13 Mon Sep 17 00:00:00 2001 From: danilo-dti <91986166+danilo-dti@users.noreply.github.com> Date: Mon, 14 Nov 2022 09:02:05 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20New=20Source:=20Microsoft=20Data?= =?UTF-8?q?verse=20[python=20cdk]=20(#18646)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Connector files * Add test files * Add integration test config files * Multiple changes to make it on Airbyte standards * Cleaning up * More clean ups * More clean ups * Removed max pages * Remove unused variable * Correctly separating Full refresh and incremental * Removed unused variables * Fix full_refresh class * Better code for creating stream classes * Fixing review comments * Update docs and Enum class * Update type conversion function * Fix enum class and update docs * Update discover * Implemented some unit tests * Update discover * Update test_source * Increase discovery test timeout * Update configured_catalog * Fix default_cursor_field * Adding final unit tests * Update spec: set client_id and tenant_id as secrets * Update discover to deal with Lookup and Picklist types * Fix Lookup data type conversion * add microsoft dataverse to source def * run format * auto-bump connector version Co-authored-by: Marcelo Pio de Castro Co-authored-by: daniloss99 Co-authored-by: Marcos Marx Co-authored-by: marcosmarxm Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 7 + .../src/main/resources/seed/source_specs.yaml | 48 ++++++ .../source-microsoft-dataverse/.dockerignore | 6 + .../source-microsoft-dataverse/Dockerfile | 38 +++++ .../source-microsoft-dataverse/README.md | 132 +++++++++++++++ .../acceptance-test-config.yml | 25 +++ .../acceptance-test-docker.sh | 16 ++ .../source-microsoft-dataverse/build.gradle | 9 ++ .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 16 ++ .../integration_tests/configured_catalog.json | 15 ++ .../integration_tests/invalid_config.json | 7 + .../integration_tests/sample_config.json | 7 + .../integration_tests/sample_state.json | 5 + .../source-microsoft-dataverse/main.py | 13 ++ .../requirements.txt | 2 + .../source-microsoft-dataverse/setup.py | 29 ++++ .../source_microsoft_dataverse/__init__.py | 8 + .../source_microsoft_dataverse/dataverse.py | 80 ++++++++++ .../source_microsoft_dataverse/source.py | 102 ++++++++++++ .../source_microsoft_dataverse/spec.yaml | 47 ++++++ .../source_microsoft_dataverse/streams.py | 150 ++++++++++++++++++ .../unit_tests/__init__.py | 3 + .../unit_tests/test_dataverse.py | 17 ++ .../unit_tests/test_incremental_streams.py | 110 +++++++++++++ .../unit_tests/test_source.py | 144 +++++++++++++++++ .../unit_tests/test_streams.py | 113 +++++++++++++ .../sources/microsoft-dataverse.md | 64 ++++++++ 29 files changed, 1221 insertions(+) create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/.dockerignore create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/Dockerfile create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/README.md create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/build.gradle create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/main.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/requirements.txt create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/setup.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/__init__.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/dataverse.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/source.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/spec.yaml create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/streams.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_dataverse.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_incremental_streams.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_streams.py create mode 100644 docs/integrations/sources/microsoft-dataverse.md diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c466c397af36e..5f9522b86cc14 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -884,6 +884,13 @@ icon: microsoft-teams.svg sourceType: api releaseStage: alpha +- name: Microsoft Dataverse + sourceDefinitionId: 9220e3de-3b60-4bb2-a46f-046d59ea235a + dockerRepository: airbyte/source-microsoft-dataverse + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.com/integrations/sources/microsoft-dataverse + sourceType: api + releaseStage: alpha - name: Mixpanel sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a dockerRepository: airbyte/source-mixpanel diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 986ac8a658660..46c54ee18cde0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7956,6 +7956,54 @@ path_in_connector_config: - "credentials" - "client_secret" +- dockerImage: "airbyte/source-microsoft-dataverse:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.io/integrations/sources/microsoft-dataverse" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Microsoft Dataverse Spec" + type: "object" + required: + - "url" + - "tenant_id" + - "client_id" + - "client_secret_value" + additionalProperties: true + properties: + url: + type: "string" + description: "URL to Microsoft Dataverse API" + title: "URL" + examples: + - "https://.crm.dynamics.com" + order: 0 + tenant_id: + type: "string" + description: "Tenant Id of your Microsoft Dataverse Instance" + title: "Tenant Id" + airbyte_secret: true + order: 1 + client_id: + type: "string" + description: "App Registration Client Id" + title: "Client Id" + airbyte_secret: true + order: 2 + client_secret_value: + type: "string" + description: "App Registration Client Secret" + title: "Client Secret" + airbyte_secret: true + order: 3 + odata_maxpagesize: + type: "integer" + description: "Max number of results per page. Default=5000" + title: "Max page size" + default: 5000 + order: 4 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-mixpanel:0.1.29" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/mixpanel" diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/.dockerignore b/airbyte-integrations/connectors/source-microsoft-dataverse/.dockerignore new file mode 100644 index 0000000000000..73db5322b3dcd --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_microsoft_dataverse +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/Dockerfile b/airbyte-integrations/connectors/source-microsoft-dataverse/Dockerfile new file mode 100644 index 0000000000000..52c806d763a38 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.9.13-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_microsoft_dataverse ./source_microsoft_dataverse + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-microsoft-dataverse diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/README.md b/airbyte-integrations/connectors/source-microsoft-dataverse/README.md new file mode 100644 index 0000000000000..261fc5d4b899e --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/README.md @@ -0,0 +1,132 @@ +# Microsoft Dataverse Source + +This is the repository for the Microsoft Dataverse source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/microsoft-dataverse). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-microsoft-dataverse:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/microsoft-dataverse) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_microsoft_dataverse/spec.yaml` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source microsoft-dataverse test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-microsoft-dataverse:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-microsoft-dataverse:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-microsoft-dataverse:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-microsoft-dataverse:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-microsoft-dataverse:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-microsoft-dataverse:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-microsoft-dataverse:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-microsoft-dataverse:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-config.yml b/airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-config.yml new file mode 100644 index 0000000000000..38c7d1899a41c --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-config.yml @@ -0,0 +1,25 @@ +# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-microsoft-dataverse:dev +tests: + spec: + - spec_path: "source_microsoft_dataverse/spec.yaml" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + timeout_seconds: 180 + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-docker.sh new file mode 100644 index 0000000000000..c51577d10690c --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/build.gradle b/airbyte-integrations/connectors/source-microsoft-dataverse/build.gradle new file mode 100644 index 0000000000000..ba9d9a834a74d --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_microsoft_dataverse' +} diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/__init__.py b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/__init__.py new file mode 100644 index 0000000000000..1100c1c58cf51 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..5ea158951cc13 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "": { + "$deltatoken": "12644418993!10/06/2050 20:06:12" + } +} diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/acceptance.py new file mode 100644 index 0000000000000..1302b2f57e10e --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..cc9f5ef9d2009 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/configured_catalog.json @@ -0,0 +1,15 @@ +{ + "streams": [ + { + "stream": { + "name": "", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "cursor_field": ["modifiedon"], + "primary_key": [[""]], + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..176722f0015ea --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/invalid_config.json @@ -0,0 +1,7 @@ +{ + "client_id": "", + "tenant_id": "", + "client_secret_value": "", + "url": "", + "odata.maxpagesize": 100 +} diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_config.json new file mode 100644 index 0000000000000..371ecbca0a6c4 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_config.json @@ -0,0 +1,7 @@ +{ + "url": "", + "client_id": "", + "tenant_id": "", + "client_secret_value": "", + "odata_maxpagesize": 5000 +} diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_state.json new file mode 100644 index 0000000000000..467d3cc4d96a6 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "": { + "$deltatoken": "12644418993!10/06/2022 20:06:12" + } +} diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/main.py b/airbyte-integrations/connectors/source-microsoft-dataverse/main.py new file mode 100644 index 0000000000000..6c673246b217d --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_microsoft_dataverse import SourceMicrosoftDataverse + +if __name__ == "__main__": + source = SourceMicrosoftDataverse() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/requirements.txt b/airbyte-integrations/connectors/source-microsoft-dataverse/requirements.txt new file mode 100644 index 0000000000000..0411042aa0911 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/setup.py b/airbyte-integrations/connectors/source-microsoft-dataverse/setup.py new file mode 100644 index 0000000000000..2eb0ceeda35e7 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/setup.py @@ -0,0 +1,29 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.2", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source_microsoft_dataverse", + description="Source implementation for Microsoft Dataverse.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/__init__.py b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/__init__.py new file mode 100644 index 0000000000000..ac8f00705ef9e --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceMicrosoftDataverse + +__all__ = ["SourceMicrosoftDataverse"] diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/dataverse.py b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/dataverse.py new file mode 100644 index 0000000000000..4581bc9af7c68 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/dataverse.py @@ -0,0 +1,80 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from enum import Enum +from typing import Any, Mapping, MutableMapping, Optional + +import requests +from airbyte_cdk.sources.streams.http.requests_native_auth.oauth import Oauth2Authenticator + + +class MicrosoftOauth2Authenticator(Oauth2Authenticator): + def build_refresh_request_body(self) -> Mapping[str, Any]: + """ + Returns the request body to set on the refresh request + """ + payload: MutableMapping[str, Any] = { + "grant_type": "client_credentials", + "client_id": self.get_client_id(), + "client_secret": self.get_client_secret(), + "scope": self.get_scopes(), + } + + return payload + + +class AirbyteType(Enum): + + String = {"type": ["null", "string"]} + Boolean = {"type": ["null", "boolean"]} + Timestamp = {"type": ["null", "string"], "format": "date-time", "airbyte_type": "timestamp_with_timezone"} + Integer = {"type": ["null", "integer"]} + Number = {"type": ["null", "number"]} + + +class DataverseType(Enum): + + String = AirbyteType.String + Uniqueidentifier = AirbyteType.String + DateTime = AirbyteType.Timestamp + Integer = AirbyteType.Integer + BigInt = AirbyteType.Integer + Money = AirbyteType.Number + Boolean = AirbyteType.Boolean + Double = AirbyteType.Number + Decimal = AirbyteType.Number + Status = AirbyteType.Integer + State = AirbyteType.Integer + Picklist = AirbyteType.Integer + Lookup = AirbyteType.String + Virtual = None + + +def get_auth(config: Mapping[str, Any]) -> MicrosoftOauth2Authenticator: + return MicrosoftOauth2Authenticator( + token_refresh_endpoint=f'https://login.microsoftonline.com/{config["tenant_id"]}/oauth2/v2.0/token', + client_id=config["client_id"], + client_secret=config["client_secret_value"], + scopes=[f'{config["url"]}/.default'], + refresh_token="", + ) + + +def do_request(config: Mapping[str, Any], path: str): + auth = get_auth(config) + headers = auth.get_auth_header() + # Call a protected API with the access token. + return requests.get( + config["url"] + "/api/data/v9.2/" + path, + headers=headers, + ) + + +def convert_dataverse_type(dataverse_type: str) -> Optional[dict]: + if dataverse_type in DataverseType.__members__: + enum_type = DataverseType[dataverse_type] + if enum_type: + return enum_type.value if enum_type.value is None else enum_type.value.value + + return AirbyteType.String.value diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/source.py b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/source.py new file mode 100644 index 0000000000000..ed99f1f51b5fb --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/source.py @@ -0,0 +1,102 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging +from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple, Union + +from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, AirbyteStream, ConfiguredAirbyteCatalog, SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream + +from .dataverse import convert_dataverse_type, do_request, get_auth +from .streams import IncrementalMicrosoftDataverseStream, MicrosoftDataverseStream + + +class SourceMicrosoftDataverse(AbstractSource): + def __init__(self): + self.catalogs = None + + def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: + response = do_request(config, "EntityDefinitions?$expand=Attributes") + response_json = response.json() + streams = [] + for entity in response_json["value"]: + schema = {"properties": {}} + for attribute in entity["Attributes"]: + dataverse_type = attribute["AttributeType"] + if dataverse_type == "Lookup": + attribute["LogicalName"] = "_" + attribute["LogicalName"] + "_value" + attribute_type = convert_dataverse_type(dataverse_type) + + if not attribute_type: + continue + + schema["properties"][attribute["LogicalName"]] = attribute_type + + if entity["CanChangeTrackingBeEnabled"]["Value"] and entity["ChangeTrackingEnabled"]: + schema["properties"].update({"_ab_cdc_updated_at": {"type": "string"}, "_ab_cdc_deleted_at": {"type": ["null", "string"]}}) + stream = AirbyteStream( + name=entity["LogicalName"], json_schema=schema, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental] + ) + stream.source_defined_cursor = True + if "modifiedon" in schema["properties"]: + stream.default_cursor_field = ["modifiedon"] + else: + stream = AirbyteStream(name=entity["LogicalName"], json_schema=schema, supported_sync_modes=[SyncMode.full_refresh]) + + stream.source_defined_primary_key = [[entity["PrimaryIdAttribute"]]] + streams.append(stream) + return AirbyteCatalog(streams=streams) + + def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + :param config: the user-input config object conforming to the connector's spec.yaml + :param logger: logger object + :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. + """ + try: + response = do_request(config, "") + # Raises an exception for error codes (4xx or 5xx) + response.raise_for_status() + return True, None + except Exception as e: + return False, e + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None, + ) -> Iterator[AirbyteMessage]: + self.catalogs = catalog + return super().read(logger, config, catalog, state) + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + """ + :param config: A Mapping of the user input configuration as defined in the connector spec. + """ + auth = get_auth(config) + + streams = [] + for catalog in self.catalogs.streams: + response = do_request(config, f"EntityDefinitions(LogicalName='{catalog.stream.name}')") + response_json = response.json() + + args = { + "url": config["url"], + "stream_name": catalog.stream.name, + "stream_path": response_json["EntitySetName"], + "primary_key": catalog.primary_key, + "schema": catalog.stream.json_schema, + "odata_maxpagesize": config["odata_maxpagesize"], + "authenticator": auth, + } + + if catalog.sync_mode == SyncMode.incremental: + streams.append(IncrementalMicrosoftDataverseStream(**args, config_cursor_field=catalog.cursor_field)) + else: + streams.append(MicrosoftDataverseStream(**args)) + + return streams diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/spec.yaml b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/spec.yaml new file mode 100644 index 0000000000000..1768da97ee2b5 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/spec.yaml @@ -0,0 +1,47 @@ +documentationUrl: https://docs.airbyte.io/integrations/sources/microsoft-dataverse +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Microsoft Dataverse Spec + type: object + required: + - url + - tenant_id + - client_id + - client_secret_value + additionalProperties: true + properties: + url: + type: string + description: URL to Microsoft Dataverse API + title: URL + examples: + - https://.crm.dynamics.com + order: 0 + + tenant_id: + type: string + description: Tenant Id of your Microsoft Dataverse Instance + title: Tenant Id + airbyte_secret: true + order: 1 + + client_id: + type: string + description: App Registration Client Id + title: Client Id + airbyte_secret: true + order: 2 + + client_secret_value: + type: string + description: App Registration Client Secret + title: Client Secret + airbyte_secret: true + order: 3 + + odata_maxpagesize: + type: integer + description: Max number of results per page. Default=5000 + title: Max page size + default: 5000 + order: 4 diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/streams.py b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/streams.py new file mode 100644 index 0000000000000..2e8b6a35fdc9d --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/source_microsoft_dataverse/streams.py @@ -0,0 +1,150 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from abc import ABC +from datetime import datetime +from typing import Any, Iterable, Mapping, MutableMapping, Optional +from urllib import parse + +import requests +from airbyte_cdk.sources.streams import IncrementalMixin +from airbyte_cdk.sources.streams.http import HttpStream + + +# Basic full refresh stream +class MicrosoftDataverseStream(HttpStream, ABC): + + # Base url will be set by init(), using information provided by the user through config input + url_base = "" + primary_key = "" + + def __init__(self, url, stream_name, stream_path, schema, primary_key, odata_maxpagesize, **kwargs): + super().__init__(**kwargs) + self.url_base = url + "/api/data/v9.2/" + self.stream_name = stream_name + self.stream_path = stream_path + self.primary_key = primary_key + self.schema = schema + self.odata_maxpagesize = odata_maxpagesize + + @property + def name(self) -> str: + """Source name""" + return self.stream_name + + def get_json_schema(self) -> Mapping[str, Any]: + return self.schema + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + :param response: the most recent response from the API + :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. + If there are no more pages in the result, return None. + """ + + response_json = response.json() + + if "@odata.nextLink" in response_json: + next_link = response_json["@odata.nextLink"] + next_link_params = dict(parse.parse_qsl(parse.urlsplit(next_link).query)) + return next_link_params + else: + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + """ + :return a dict containing the parameters to be used in the request + """ + request_params = super().request_params(stream_state) + # If there is not a nextLink(contains "next_page_token") in the response, means it is the last page. + # In this case, the deltatoken is passed instead. + if next_page_token is None: + request_params.update(stream_state) + return request_params + elif next_page_token is not None: + request_params.update(next_page_token) + return request_params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + :return an iterable containing each record in the response + """ + for result in response.json()["value"]: + yield result + + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + return { + "Cache-Control": "no-cache", + "OData-Version": "4.0", + "Content-Type": "application/json", + "Prefer": "odata.maxpagesize=" + str(self.odata_maxpagesize), + } + + def path( + self, + *, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return self.stream_path + + +# Basic incremental stream +class IncrementalMicrosoftDataverseStream(MicrosoftDataverseStream, IncrementalMixin, ABC): + + delta_token_field = "$deltatoken" + state_checkpoint_interval = None # For now we just use the change tracking as state, and it is only emitted on last page + + def __init__(self, url, stream_name, stream_path, schema, primary_key, odata_maxpagesize, config_cursor_field, **kwargs): + super().__init__(url, stream_name, stream_path, schema, primary_key, odata_maxpagesize, **kwargs) + self._cursor_value = None + self.config_cursor_field = config_cursor_field + + @property + def state(self) -> Mapping[str, Any]: + return {self.delta_token_field: str(self._cursor_value)} + + @property + def cursor_field(self) -> str: + return self.config_cursor_field + + # Sets the state got by state getter. "value" is the return of state getter -> dict + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = value[self.delta_token_field] + + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + """ + Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. + """ + request_headers = super().request_headers(stream_state=stream_state) + request_headers.update( + {"Prefer": "odata.track-changes," + request_headers["Prefer"]} + ) # odata.track-changes -> Header that enables change tracking + return request_headers + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + if "@odata.deltaLink" in response_json: + delta_link = response_json["@odata.deltaLink"] + delta_link_params = dict(parse.parse_qsl(parse.urlsplit(delta_link).query)) + self._cursor_value = delta_link_params[self.delta_token_field] + for result in response_json["value"]: + if "@odata.context" in result and result["reason"] == "deleted": + result.update({self.primary_key[0][0]: result["id"]}) + result.pop("@odata.context", None) + result.pop("id", None) + result.pop("reason", None) + result.update({"_ab_cdc_deleted_at": datetime.now().isoformat()}) + else: + result.update({"_ab_cdc_updated_at": result[self.cursor_field[0]]}) + + yield result diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/__init__.py b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/__init__.py new file mode 100644 index 0000000000000..1100c1c58cf51 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_dataverse.py b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_dataverse.py new file mode 100644 index 0000000000000..834daa5923da7 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_dataverse.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from source_microsoft_dataverse.dataverse import AirbyteType, convert_dataverse_type + + +@pytest.mark.parametrize("dataverse_type,expected_result", [ + ("String", AirbyteType.String.value), + ("Integer", AirbyteType.Integer.value), + ("Virtual", None), + ("Random", AirbyteType.String.value) +]) +def test_convert_dataverse_type(dataverse_type, expected_result): + result = convert_dataverse_type(dataverse_type) + assert result == expected_result diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_incremental_streams.py new file mode 100644 index 0000000000000..f4db9179ad44e --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_incremental_streams.py @@ -0,0 +1,110 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from pytest import fixture +from source_microsoft_dataverse.source import IncrementalMicrosoftDataverseStream + + +@fixture +def incremental_config(): + return { + "url": "http://test-url", + "stream_name": "test_stream", + "stream_path": "test_path", + "primary_key": [["test_primary_key"]], + "schema": { + + }, + "odata_maxpagesize": 100, + "config_cursor_field": ["test_cursor_field"], + "authenticator": MagicMock() + } + + +@fixture +def incremental_response(incremental_config): + return { + "@odata.deltaLink": f"{incremental_config['url']}?$deltatoken=12644418993%2110%2F06%2F2022%2020%3A06%3A12", + "value": [ + { + "test_primary_key": "pk", + "test_cursor_field": "test-date" + }, + { + "id": "pk2", + "@odata.context": "context", + "reason": "deleted" + } + ] + } + + +def test_primary_key(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + expected_primary_key = [["test_primary_key"]] + assert stream.primary_key == expected_primary_key + + +def test_stream_name(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + expected_stream_name = "test_stream" + assert stream.name == expected_stream_name + + +def test_stream_path(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + expected_stream_path = "test_path" + assert stream.path() == expected_stream_path + + +def test_cursor_field(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + expected_cursor_field = ["test_cursor_field"] + assert stream.cursor_field == expected_cursor_field + + +def test_supports_incremental(incremental_config, mocker): + mocker.patch.object(IncrementalMicrosoftDataverseStream, "cursor_field", "dummy_field") + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + assert stream.supports_incremental + + +def test_source_defined_cursor(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval + + +def test_parse_request(incremental_config, incremental_response, mocker): + response_mock, datetime_mock = MagicMock(), MagicMock() + response_mock.json.return_value = incremental_response + datetime_mock.now.return_value.isoformat.return_value = "test-time" + mocker.patch("source_microsoft_dataverse.streams.datetime", datetime_mock) + + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + + iterable = stream.parse_response(response_mock) + iterable_list = list(iterable) + assert len(iterable_list) == 2 + assert stream.state[stream.delta_token_field] == "12644418993!10/06/2022 20:06:12" + assert iterable_list[0]["_ab_cdc_updated_at"] == "test-date" + assert iterable_list[1]["_ab_cdc_deleted_at"] == "test-time" + assert iterable_list[1][incremental_config["primary_key"][0][0]] == "pk2" + assert "id" not in iterable_list[1] + assert "reason" not in iterable_list[1] + assert "@odata.context" not in iterable_list[1] + + +def test_request_headers(incremental_config): + stream = IncrementalMicrosoftDataverseStream(**incremental_config) + headers = stream.request_headers(stream_state={}) + assert "Prefer" in headers + assert headers["Prefer"] == "odata.track-changes,odata.maxpagesize=100" diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_source.py b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_source.py new file mode 100644 index 0000000000000..bb93ad44b8239 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_source.py @@ -0,0 +1,144 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json +from unittest import mock +from unittest.mock import MagicMock + +from airbyte_cdk.models import SyncMode +from source_microsoft_dataverse.dataverse import AirbyteType +from source_microsoft_dataverse.source import SourceMicrosoftDataverse +from source_microsoft_dataverse.streams import IncrementalMicrosoftDataverseStream, MicrosoftDataverseStream + + +@mock.patch("source_microsoft_dataverse.source.do_request") +def test_check_connection(mock_request): + mock_request.return_value.raise_for_status = lambda: () + source = SourceMicrosoftDataverse() + logger_mock, config_mock = MagicMock(), MagicMock() + assert source.check_connection(logger_mock, config_mock) == (True, None) + + +@mock.patch("source_microsoft_dataverse.source.get_auth") +@mock.patch("source_microsoft_dataverse.source.do_request") +def test_streams_incremental(mock_get_auth, mock_request): + streams = MagicMock() + streams.sync_mode = SyncMode.incremental + streams.stream.name = "test" + + catalog = MagicMock() + + catalog.streams = [streams] + + config_mock = MagicMock() + source = SourceMicrosoftDataverse() + source.catalogs = catalog + + streams = source.streams(config_mock) + + expected_streams_number = 1 + assert len(streams) == expected_streams_number + assert isinstance(streams[0], IncrementalMicrosoftDataverseStream) + assert streams[0].name == "test" + + +@mock.patch("source_microsoft_dataverse.source.get_auth") +@mock.patch("source_microsoft_dataverse.source.do_request") +def test_streams_full_refresh(mock_get_auth, mock_request): + streams = MagicMock() + streams.sync_mode = SyncMode.full_refresh + streams.stream.name = "test" + + catalog = MagicMock() + + catalog.streams = [streams] + + config_mock = MagicMock() + source = SourceMicrosoftDataverse() + source.catalogs = catalog + + streams = source.streams(config_mock) + + expected_streams_number = 1 + assert len(streams) == expected_streams_number + assert isinstance(streams[0], MicrosoftDataverseStream) + assert streams[0].name == "test" + + +@mock.patch("source_microsoft_dataverse.source.do_request") +def test_discover_incremental(mock_request): + result_json = json.loads(''' + { + "value": [ + { + "LogicalName": "stream", + "PrimaryIdAttribute": "primary", + "ChangeTrackingEnabled": true, + "CanChangeTrackingBeEnabled": { + "Value": true + }, + "Attributes": [ + { + "LogicalName": "test", + "AttributeType": "String" + }, + { + "LogicalName": "modifiedon", + "AttributeType": "DateTime" + } + ] + } + ] + } + ''') + + mock_request.return_value.status.return_value = 200 + mock_request.return_value.json.return_value = result_json + + source = SourceMicrosoftDataverse() + logger_mock, config_mock = MagicMock(), MagicMock() + + catalog = source.discover(logger_mock, config_mock) + + assert not {'modifiedon'} ^ set(catalog.streams[0].default_cursor_field) + assert not {SyncMode.full_refresh, SyncMode.incremental} ^ set(catalog.streams[0].supported_sync_modes) + assert not {'primary'} ^ set(catalog.streams[0].source_defined_primary_key[0]) + assert catalog.streams[0].json_schema["properties"]["test"] == AirbyteType.String.value + + +@mock.patch("source_microsoft_dataverse.source.do_request") +def test_discover_full_refresh(mock_request): + result_json = json.loads(''' + { + "value": [ + { + "LogicalName": "stream", + "PrimaryIdAttribute": "primary", + "ChangeTrackingEnabled": false, + "CanChangeTrackingBeEnabled": { + "Value": false + }, + "Attributes": [ + { + "LogicalName": "test", + "AttributeType": "String" + } + ] + } + ] + } + ''') + + mock_request.return_value.status.return_value = 200 + mock_request.return_value.json.return_value = result_json + + source = SourceMicrosoftDataverse() + logger_mock, config_mock = MagicMock(), MagicMock() + + catalog = source.discover(logger_mock, config_mock) + + assert catalog.streams[0].default_cursor_field is None or len(catalog.streams[0].default_cursor_field) == 0 + assert not {SyncMode.full_refresh} ^ set(catalog.streams[0].supported_sync_modes) + assert not {'primary'} ^ set(catalog.streams[0].source_defined_primary_key[0]) + assert catalog.streams[0].json_schema["properties"]["test"] == AirbyteType.String.value diff --git a/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_streams.py new file mode 100644 index 0000000000000..d58f2763229e6 --- /dev/null +++ b/airbyte-integrations/connectors/source-microsoft-dataverse/unit_tests/test_streams.py @@ -0,0 +1,113 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from pytest import fixture +from source_microsoft_dataverse.source import MicrosoftDataverseStream + + +@fixture +def incremental_config(): + return { + "url": "http://test-url", + "stream_name": "test_stream", + "stream_path": "test_path", + "primary_key": [["test_primary_key"]], + "schema": { + + }, + "odata_maxpagesize": 100, + "authenticator": MagicMock() + } + + +@pytest.mark.parametrize( + ("inputs", "expected_params"), + [ + ({"stream_slice": None, "stream_state": {}, "next_page_token": None}, {}), + ({"stream_slice": None, "stream_state": {}, "next_page_token": {"$skiptoken": "skiptoken"}}, {"$skiptoken": "skiptoken"}), + ({"stream_slice": None, "stream_state": {"$deltatoken": "delta"}, "next_page_token": None}, {"$deltatoken": "delta"}) + ], +) +def test_request_params(inputs, expected_params, incremental_config): + stream = MicrosoftDataverseStream(**incremental_config) + assert stream.request_params(**inputs) == expected_params + + +@pytest.mark.parametrize( + ("response_json", "next_page_token"), + [ + ({"@odata.nextLink": "https://url?$skiptoken=oEBwdSP6uehIAxQOWq_3Ksh_TLol6KIm3stvdc6hGhZRi1hQ7Spe__dpvm3U4zReE4CYXC2zOtaKdi7KHlUtC2CbRiBIUwOxPKLa"}, + {"$skiptoken": "oEBwdSP6uehIAxQOWq_3Ksh_TLol6KIm3stvdc6hGhZRi1hQ7Spe__dpvm3U4zReE4CYXC2zOtaKdi7KHlUtC2CbRiBIUwOxPKLa"}), + ({"value": []}, None), + ], +) +def test_next_page_token(response_json, next_page_token, incremental_config): + stream = MicrosoftDataverseStream(**incremental_config) + response = MagicMock() + response.json.return_value = response_json + inputs = {"response": response} + expected_token = next_page_token + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(incremental_config): + stream = MicrosoftDataverseStream(**incremental_config) + response = MagicMock() + response.json.return_value = { + "value": [ + { + "test-key": "test-value" + } + ] + } + inputs = {"response": response} + expected_parsed_object = { + "test-key": "test-value" + } + assert next(stream.parse_response(**inputs)) == expected_parsed_object + + +def test_request_headers(incremental_config): + stream = MicrosoftDataverseStream(**incremental_config) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_headers = { + "Cache-Control": "no-cache", + "OData-Version": "4.0", + "Content-Type": "application/json", + "Prefer": "odata.maxpagesize=100" + } + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(incremental_config): + stream = MicrosoftDataverseStream(**incremental_config) + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(incremental_config, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = MicrosoftDataverseStream(**incremental_config) + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(incremental_config): + response_mock = MagicMock() + stream = MicrosoftDataverseStream(**incremental_config) + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/docs/integrations/sources/microsoft-dataverse.md b/docs/integrations/sources/microsoft-dataverse.md new file mode 100644 index 0000000000000..6e67c663df270 --- /dev/null +++ b/docs/integrations/sources/microsoft-dataverse.md @@ -0,0 +1,64 @@ +# Microsoft Dataverse + +## Sync overview + +This source can sync data for the [Microsoft Dataverse API](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/webapi/overview) to work with [Microsoft Dataverse](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/overview). + +This connector currently uses version v9.2 of the API + +### Output schema + +This source will automatically discover the schema of the Entities of your Dataverse instance using the API +https:///api/data/v9.2/EntityDefinitions?$expand=Attributes + +### Data type mapping + +| Integration Type | Airbyte Type | Notes | +|:-------------------|:--------------------------|:----------------------| +| `String` | `string` | | +| `UniqueIdentifier` | `string` | | +| `DateTime` | `timestamp with timezone` | | +| `Integer` | `integer` | | +| `BigInt` | `integer` | | +| `Money` | `number` | | +| `Boolean` | `boolean` | | +| `Double` | `number` | | +| `Decimal` | `number` | | +| `Status` | `integer` | | +| `State` | `integer` | | +| `Virtual` | None | We skip virtual types | + +Other types are defined as `string`. + +### Features + +| Feature | Supported?\(Yes/No\) | Notes | +|:------------------------------|:---------------------|:-----------------------------------------------------------| +| Full Refresh Sync | Yes | | +| Incremental Sync | Yes | | +| CDC | Yes | Not all entities support it. Deleted data only have the ID | +| Replicate Incremental Deletes | Yes | | +| SSL connection | Yes | | +| Namespaces | No | | + +## Getting started + +### Requirements + +* Application \(client\) ID +* Directory \(tenant\) ID +* Client secrets + +### Setup guide + +The Microsoft Dataverse API uses OAuth2 for authentication. We need a 'client_credentials' type, that we usually get by using an App Registration. +https://learn.microsoft.com/en-us/power-apps/developer/data-platform/authenticate-oauth + +The procedure to generate the credentials and setup the necessary permissions is well described in this post from Magnetism blog: +https://blog.magnetismsolutions.com/blog/paulnieuwelaar/2021/9/21/setting-up-an-application-user-in-dynamics-365 + + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +|:--------|:-----|:-------------|:--------|