From eecbf8551e9119edb96724e2b12c1f642abcc9f9 Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Mon, 6 Sep 2021 19:57:46 +0300 Subject: [PATCH 1/7] Add Marketo implementation --- .github/workflows/publish-command.yml | 1 + .github/workflows/test-command.yml | 1 + .../9e0556f4-69df-4522-a3fb-03264d36b348.json | 4 +- .../resources/seed/source_definitions.yaml | 4 +- .../connectors/source-marketo/.dockerignore | 0 .../connectors/source-marketo/Dockerfile | 16 + .../connectors/source-marketo/README.md | 131 +++++ .../source-marketo/acceptance-test-config.yml | 27 + .../source-marketo/acceptance-test-docker.sh | 16 + ....py::TestBasicRead::test_read[inputs0].txt | 0 ...efresh::test_sequential_reads[inputs0].txt | 0 .../connectors/source-marketo/build.gradle | 14 + .../integration_tests/__init__.py | 0 .../integration_tests/abnormal_state.json | 17 + .../integration_tests/acceptance.py | 34 ++ .../integration_tests/catalog.json | 35 ++ .../integration_tests/configured_catalog.json | 68 +++ .../integration_tests/invalid_config.json | 7 + .../integration_tests/sample_config.json | 7 + .../integration_tests/sample_state.json | 17 + .../connectors/source-marketo/main.py | 33 ++ .../source-marketo/requirements.txt | 2 + .../connectors/source-marketo/setup.py | 49 ++ .../source-marketo/source_marketo/__init__.py | 28 + .../schemas/activity_types.json | 40 ++ .../source_marketo/schemas/campaigns.json | 38 ++ .../source_marketo/schemas/leads.json | 281 ++++++++++ .../source_marketo/schemas/lists.json | 32 ++ .../source_marketo/schemas/programs.json | 52 ++ .../source-marketo/source_marketo/source.py | 530 ++++++++++++++++++ .../source-marketo/source_marketo/spec.json | 40 ++ .../source-marketo/source_marketo/utils.py | 82 +++ .../source-marketo/unit_tests/__init__.py | 23 + .../source-marketo/unit_tests/unit_test.py | 60 ++ docs/integrations/README.md | 2 +- docs/integrations/sources/marketo.md | 16 +- tools/bin/ci_credentials.sh | 1 + 37 files changed, 1695 insertions(+), 13 deletions(-) create mode 100644 airbyte-integrations/connectors/source-marketo/.dockerignore create mode 100644 airbyte-integrations/connectors/source-marketo/Dockerfile create mode 100644 airbyte-integrations/connectors/source-marketo/README.md create mode 100644 airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-marketo/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt create mode 100644 airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt create mode 100644 airbyte-integrations/connectors/source-marketo/build.gradle create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-marketo/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-marketo/main.py create mode 100644 airbyte-integrations/connectors/source-marketo/requirements.txt create mode 100644 airbyte-integrations/connectors/source-marketo/setup.py create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/__init__.py create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/source.py create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/spec.json create mode 100644 airbyte-integrations/connectors/source-marketo/source_marketo/utils.py create mode 100644 airbyte-integrations/connectors/source-marketo/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-marketo/unit_tests/unit_test.py diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 9b3e1b3b15ba6..ba554b3c2ca9f 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -136,6 +136,7 @@ jobs: SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS }} SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS }} SOURCE_SQUARE_CREDS: ${{ secrets.SOURCE_SQUARE_CREDS }} + SOURCE_MARKETO_TEST_CREDS: ${{ secrets.SOURCE_MARKETO_TEST_CREDS }} SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }} SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }} SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index bc4985f6c213d..478c36301f9f7 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -136,6 +136,7 @@ jobs: SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS }} SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS }} SOURCE_SQUARE_CREDS: ${{ secrets.SOURCE_SQUARE_CREDS }} + SOURCE_MARKETO_TEST_CREDS: ${{ secrets.SOURCE_MARKETO_TEST_CREDS }} SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }} SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }} SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }} diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json index a406a267acb41..acc3e0d307a55 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json @@ -1,8 +1,8 @@ { "sourceDefinitionId": "9e0556f4-69df-4522-a3fb-03264d36b348", "name": "Marketo", - "dockerRepository": "airbyte/source-marketo-singer", - "dockerImageTag": "0.2.3", + "dockerRepository": "airbyte/source-marketo", + "dockerImageTag": "0.1.0", "documentationUrl": "https://docs.airbyte.io/integrations/sources/marketo", "icon": "marketo.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index a6bc737074bc3..154b8fbcc4be7 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -88,8 +88,8 @@ icon: sendgrid.svg - sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348 name: Marketo - dockerRepository: airbyte/source-marketo-singer - dockerImageTag: 0.2.3 + dockerRepository: airbyte/source-marketo + dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.io/integrations/sources/marketo icon: marketo.svg - sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7 diff --git a/airbyte-integrations/connectors/source-marketo/.dockerignore b/airbyte-integrations/connectors/source-marketo/.dockerignore new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-marketo/Dockerfile b/airbyte-integrations/connectors/source-marketo/Dockerfile new file mode 100644 index 0000000000000..e9426b2decd80 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.7-slim + +# Bash is installed for more convenient debugging. +RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* + +WORKDIR /airbyte/integration_code +COPY source_marketo ./source_marketo +COPY main.py ./ +COPY setup.py ./ +RUN pip install . + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-marketo \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-marketo/README.md b/airbyte-integrations/connectors/source-marketo/README.md new file mode 100644 index 0000000000000..13bb4d40d7043 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/README.md @@ -0,0 +1,131 @@ +# Marketo Source + +This is the repository for the Marketo source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/marketo). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-marketo:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/marketo) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_marketo/spec.json` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source marketo test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-marketo:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-marketo:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-marketo:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-marketo:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-marketo:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-marketo:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-marketo:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-marketo:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml b/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml new file mode 100644 index 0000000000000..42a702315a868 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml @@ -0,0 +1,27 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-marketo:dev +tests: + spec: + - spec_path: "source_marketo/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" +# basic_read: +# - config_path: "secrets/config.json" +# configured_catalog_path: "integration_tests/configured_catalog.json" +# empty_streams: [] +# timeout_seconds: 3600 + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + timeout_seconds: 3600 +# full_refresh: +# - config_path: "secrets/config.json" +# configured_catalog_path: "integration_tests/configured_catalog.json" +# timeout_seconds: 3600 diff --git a/airbyte-integrations/connectors/source-marketo/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-marketo/acceptance-test-docker.sh new file mode 100644 index 0000000000000..e4d8b1cef8961 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt b/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt b/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-marketo/build.gradle b/airbyte-integrations/connectors/source-marketo/build.gradle new file mode 100644 index 0000000000000..63759ef1c64df --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_marketo' +} + +dependencies { + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) +} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/__init__.py b/airbyte-integrations/connectors/source-marketo/integration_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-marketo/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..a4f7b59a29b9b --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/abnormal_state.json @@ -0,0 +1,17 @@ +{ + "programs": { + "updatedAt": "2025-07-15T07:41:30Z" + }, + "campaigns": { + "createdAt": "2025-07-15T07:41:30Z" + }, + "lists": { + "createdAt": "2025-07-15T07:41:30Z" + }, + "leads": { + "updatedAt": "2025-07-15T07:41:30Z" + }, + "activities_visit_webpage": { + "activityDate": "2025-07-15T07:41:30Z" + } +} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-marketo/integration_tests/acceptance.py new file mode 100644 index 0000000000000..d6cbdc97c495c --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/acceptance.py @@ -0,0 +1,34 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """ This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/catalog.json b/airbyte-integrations/connectors/source-marketo/integration_tests/catalog.json new file mode 100644 index 0000000000000..faf18a17a46bb --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/catalog.json @@ -0,0 +1,35 @@ +{ + "streams": [ + { + "stream": { + "name": "programs", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "campaigns", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "activity_types", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-marketo/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..7f9eb023b6aa5 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/configured_catalog.json @@ -0,0 +1,68 @@ +{ + "streams": [ + { + "stream": { + "name": "programs", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "campaigns", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "lists", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "leads", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "activity_types", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "activities_visit_webpage", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..416cd1af83c80 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json @@ -0,0 +1,7 @@ +{ + "client_id": "invalid_client_id", + "client_secret": "invalid_client_secret", + "base_url": "invalid_base_url", + "start_date": "0000-00-00T00:00:00Z", + "window_in_days": 2 +} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json new file mode 100644 index 0000000000000..1beb13712a9b4 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json @@ -0,0 +1,7 @@ +{ + "client_id": "client_id", + "client_secret": "client_secret", + "base_url": "base_url", + "start_date": "0000-00-00T00:00:00Z", + "window_in_days": 2 +} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-marketo/integration_tests/sample_state.json new file mode 100644 index 0000000000000..be2c4e22ee15c --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/sample_state.json @@ -0,0 +1,17 @@ +{ + "programs": { + "updatedAt": "2021-07-15T07:41:30Z" + }, + "campaigns": { + "createdAt": "2021-07-15T07:41:30Z" + }, + "lists": { + "createdAt": "2021-07-15T07:41:30Z" + }, + "leads": { + "updatedAt": "2021-07-15T07:41:30Z" + }, + "activities_visit_webpage": { + "activityDate": "2021-07-15T07:41:30Z" + } +} diff --git a/airbyte-integrations/connectors/source-marketo/main.py b/airbyte-integrations/connectors/source-marketo/main.py new file mode 100644 index 0000000000000..354e4dd7f45bd --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/main.py @@ -0,0 +1,33 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_marketo import SourceMarketo + +if __name__ == "__main__": + source = SourceMarketo() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-marketo/requirements.txt b/airbyte-integrations/connectors/source-marketo/requirements.txt new file mode 100644 index 0000000000000..0411042aa0911 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-marketo/setup.py b/airbyte-integrations/connectors/source-marketo/setup.py new file mode 100644 index 0000000000000..758afbc26d109 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/setup.py @@ -0,0 +1,49 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source_marketo", + description="Source implementation for Marketo.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/__init__.py b/airbyte-integrations/connectors/source-marketo/source_marketo/__init__.py new file mode 100644 index 0000000000000..096f565805a4f --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/__init__.py @@ -0,0 +1,28 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from .source import SourceMarketo + +__all__ = ["SourceMarketo"] diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json new file mode 100644 index 0000000000000..dc114486d82d4 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json @@ -0,0 +1,40 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "description": { + "type": ["string", "null"] + }, + "primaryAttribute": { + "type": ["object", "null"], + "properties": { + "name": { + "type": "string" + }, + "dataType": { + "type": "string" + } + } + }, + "attributes": { + "type": ["array", "null"], + "items": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "dataType": { + "type": "string" + } + } + } + } + } +} diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json new file mode 100644 index 0000000000000..33ef4be032b89 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json @@ -0,0 +1,38 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "integer" + }, + "createdAt": { + "type": "string", + "format": "date-time" + }, + "updatedAt": { + "type": "string", + "format": "date-time" + }, + "active": { + "type": ["boolean", "null"] + }, + "description": { + "type": ["string", "null"] + }, + "name": { + "type": "string" + }, + "programId": { + "type": ["integer", "null"] + }, + "programName": { + "type": ["string", "null"] + }, + "type": { + "type": "string" + }, + "workspaceName": { + "type": ["string", "null"] + } + } +} diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json new file mode 100644 index 0000000000000..566d75d85613d --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json @@ -0,0 +1,281 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "company": { + "type": ["string", "null"] + }, + "site": { + "type": ["string", "null"] + }, + "billingStreet": { + "type": ["string", "null"] + }, + "billingCity": { + "type": ["string", "null"] + }, + "billingState": { + "type": ["string", "null"] + }, + "billingCountry": { + "type": ["string", "null"] + }, + "billingPostalCode": { + "type": ["string", "null"] + }, + "website": { + "type": ["string", "null"] + }, + "mainPhone": { + "type": ["string", "null"] + }, + "annualRevenue": { + "type": ["number", "null"] + }, + "numberOfEmployees": { + "type": ["integer", "null"] + }, + "industry": { + "type": ["string", "null"] + }, + "sicCode": { + "type": ["string", "null"] + }, + "mktoCompanyNotes": { + "type": ["string", "null"] + }, + "externalCompanyId": { + "type": ["string", "null"] + }, + "id": { + "type": "integer" + }, + "mktoName": { + "type": ["string", "null"] + }, + "personType": { + "type": ["string", "null"] + }, + "mktoIsPartner": { + "type": ["boolean", "null"] + }, + "isLead": { + "type": ["boolean", "null"] + }, + "mktoIsCustomer": { + "type": ["boolean", "null"] + }, + "isAnonymous": { + "type": ["boolean", "null"] + }, + "salutation": { + "type": ["string", "null"] + }, + "firstName": { + "type": ["string", "null"] + }, + "middleName": { + "type": ["string", "null"] + }, + "lastName": { + "type": ["string", "null"] + }, + "email": { + "type": ["string", "null"] + }, + "phone": { + "type": ["string", "null"] + }, + "mobilePhone": { + "type": ["string", "null"] + }, + "fax": { + "type": ["string", "null"] + }, + "title": { + "type": ["string", "null"] + }, + "contactCompany": { + "type": ["string", "null"] + }, + "dateOfBirth": { + "type": ["string", "null"], + "format": "date-time" + }, + "address": { + "type": ["string", "null"] + }, + "city": { + "type": ["string", "null"] + }, + "state": { + "type": ["string", "null"] + }, + "country": { + "type": ["string", "null"] + }, + "postalCode": { + "type": ["string", "null"] + }, + "personTimeZone": { + "type": ["string", "null"] + }, + "originalSourceType": { + "type": ["string", "null"] + }, + "originalSourceInfo": { + "type": ["string", "null"] + }, + "registrationSourceType": { + "type": ["string", "null"] + }, + "registrationSourceInfo": { + "type": ["string", "null"] + }, + "originalSearchEngine": { + "type": ["string", "null"] + }, + "originalSearchPhrase": { + "type": ["string", "null"] + }, + "originalReferrer": { + "type": ["string", "null"] + }, + "emailInvalid": { + "type": ["boolean", "null"] + }, + "emailInvalidCause": { + "type": ["string", "null"] + }, + "unsubscribed": { + "type": ["boolean", "null"] + }, + "unsubscribedReason": { + "type": ["string", "null"] + }, + "doNotCall": { + "type": ["boolean", "null"] + }, + "mktoDoNotCallCause": { + "type": ["string", "null"] + }, + "doNotCallReason": { + "type": ["string", "null"] + }, + "marketingSuspended": { + "type": ["boolean", "null"] + }, + "marketingSuspendedCause": { + "type": ["string", "null"] + }, + "blackListed": { + "type": ["boolean", "null"] + }, + "blackListedCause": { + "type": ["string", "null"] + }, + "mktoPersonNotes": { + "type": ["string", "null"] + }, + "anonymousIP": { + "type": ["string", "null"] + }, + "inferredCompany": { + "type": ["string", "null"] + }, + "inferredCountry": { + "type": ["string", "null"] + }, + "inferredCity": { + "type": ["string", "null"] + }, + "inferredStateRegion": { + "type": ["string", "null"] + }, + "inferredPostalCode": { + "type": ["string", "null"] + }, + "inferredMetropolitanArea": { + "type": ["string", "null"] + }, + "inferredPhoneAreaCode": { + "type": ["string", "null"] + }, + "emailSuspended": { + "type": ["boolean", "null"] + }, + "emailSuspendedCause": { + "type": ["string", "null"] + }, + "emailSuspendedAt": { + "type": ["string", "null"], + "format": "date-time" + }, + "department": { + "type": ["string", "null"] + }, + "createdAt": { + "type": "string", + "format": "date-time" + }, + "updatedAt": { + "type": "string", + "format": "date-time" + }, + "cookies": { + "type": ["string", "null"] + }, + "externalSalesPersonId": { + "type": ["string", "null"] + }, + "leadPerson": { + "type": ["string", "null"] + }, + "leadRole": { + "type": ["string", "null"] + }, + "leadSource": { + "type": ["string", "null"] + }, + "leadStatus": { + "type": ["string", "null"] + }, + "leadScore": { + "type": ["integer", "null"] + }, + "urgency": { + "type": ["number", "null"] + }, + "priority": { + "type": ["integer", "null"] + }, + "relativeScore": { + "type": ["integer", "null"] + }, + "relativeUrgency": { + "type": ["integer", "null"] + }, + "rating": { + "type": ["string", "null"] + }, + "personPrimaryLeadInterest": { + "type": ["string", "null"] + }, + "leadPartitionId": { + "type": ["string", "null"] + }, + "leadRevenueCycleModelId": { + "type": ["string", "null"] + }, + "leadRevenueStageId": { + "type": ["string", "null"] + }, + "acquisitionProgramId": { + "type": ["string", "null"] + }, + "mktoAcquisitionDate": { + "type": ["string", "null"], + "format": "date-time" + } + } +} diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json new file mode 100644 index 0000000000000..d240c421a3c3f --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json @@ -0,0 +1,32 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "createdAt": { + "type": "string", + "format": "date-time" + }, + "updatedAt": { + "type": "string", + "format": "date-time" + }, + "description": { + "type": ["string", "null"] + }, + "programName": { + "type": ["string", "null"] + }, + "workspaceName": { + "type": ["string", "null"] + }, + "workspaceId": { + "type": "integer" + } + } +} diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json new file mode 100644 index 0000000000000..d1c1be8181045 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json @@ -0,0 +1,52 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "integer" + }, + "createdAt": { + "type": "string", + "format": "date-time" + }, + "updatedAt": { + "type": "string", + "format": "date-time" + }, + "name": { + "type": "string" + }, + "description": { + "type": ["null", "string"] + }, + "url": { + "type": ["null", "string"] + }, + "type": { + "type": ["null", "string"] + }, + "channel": { + "type": ["null", "string"] + }, + "status": { + "type": ["null", "string"] + }, + "workspace": { + "type": ["null", "string"] + }, + "folder": { + "type": "object", + "properties": { + "type": { + "type": ["null", "string"] + }, + "value": { + "type": ["null", "integer"] + }, + "folderName": { + "type": ["null", "string"] + } + } + } + } +} diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py new file mode 100644 index 0000000000000..e57de9dca3e20 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py @@ -0,0 +1,530 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import csv +import json +from abc import ABC +from time import sleep +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple + +import pendulum +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator + +from .utils import STRING_TYPES, clean_string, format_value, to_datetime_str + + +class MarketoStream(HttpStream, ABC): + primary_key = "id" + data_field = "result" + page_size = 300 + + def __init__(self, config: Mapping[str, Any], stream_name: str = None, param: Mapping[str, Any] = None, export_id: int = None): + super().__init__(authenticator=config["authenticator"]) + self.config = config + self.start_date = config["start_date"] + self.window_in_days = config["window_in_days"] + self._url_base = config["base_url"] + self.stream_name = stream_name + self.param = param + self.export_id = export_id + + @property + def url_base(self) -> str: + return self._url_base + + def path(self, **kwargs) -> str: + return f"/rest/v1/{self.name}.json" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + next_page = response.json().get("nextPageToken") + + if next_page: + return {"nextPageToken": next_page} + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = {"batchSize": self.page_size} + if next_page_token: + params.update(**next_page_token) + return params + + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: + data = response.json().get(self.data_field, []) + + for record in data: + yield record + + +class IncrementalMarketoStream(MarketoStream): + cursor_field = "createdAt" + + def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> Iterable: + """ + Endpoint does not provide query filtering params, but they provide us + cursor field in most cases, so we used that as incremental filtering + during the parsing. + """ + + if not stream_state or record[self.cursor_field] >= stream_state.get(self.cursor_field): + yield record + + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: + json_response = response.json().get(self.data_field) or [] + + for record in json_response: + yield from self.filter_by_state(stream_state=stream_state, record=record) + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + return { + self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, self.start_date)) + } + + def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + """ + Override default stream_slices CDK method to provide date_slices as page chunks for data fetch. + Returns list of dict, example: [{ + "startDate": "2020-01-01T0:0:0Z", + "endDate": "2021-01-02T0:0:0Z" + }, + { + "startDate": "2020-01-03T0:0:0Z", + "endDate": "2021-01-04T0:0:0Z" + }, + ...] + """ + + start_date = pendulum.parse(self.start_date) + end_date = pendulum.now() + + # Determine stream_state, if no stream_state we use start_date + if stream_state: + start_date = pendulum.parse(stream_state.get(self.cursor_field)) + + # use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future + start_date = min(start_date, end_date) + date_slices = [] + + while start_date <= end_date: + end_date_slice = start_date.add(days=self.window_in_days) + date_slice = {"startAt": to_datetime_str(start_date), "endAt": to_datetime_str(end_date_slice)} + + date_slices.append(date_slice) + start_date = end_date_slice + + return date_slices + + +class MarketoExportBase(IncrementalMarketoStream): + """ + Base class for all the streams which support bulk extract. + """ + + job_timeout = 60 * 180 + poll_interval = 60 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + @property + def stream_fields(self): + return {} + + @property + def stream_filter(self): + return {} + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f"/bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json" + + def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + date_slices = super().stream_slices(sync_mode, stream_state, **kwargs) + + for date_slice in date_slices: + param = {"fields": [], "filter": {"createdAt": date_slice}} + param["fields"].extend(self.stream_fields) + param["filter"].update(self.stream_filter) + + export = next(MarketoExportCreate(self.config, stream_name=self.stream_name, param=param).read_records(sync_mode=None), {}) + + date_slice["id"] = export["exportId"] + return date_slices + + def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool: + timeout_time = pendulum.now().add(seconds=self.job_timeout) + while pendulum.now() < timeout_time: + status = next( + MarketoExportStatus(self.config, stream_name=self.stream_name, export_id=stream_slice["id"]).read_records(sync_mode=None) + ) + self.logger.info(f"Export {self.name} from {stream_slice['startAt']} to {stream_slice['endAt']} status is {status}") + + if status == "Created": + # If the status is created, the export has been made but + # not started, so enqueue the export. + next( + MarketoExportStart(self.config, stream_name=self.stream_name, export_id=stream_slice["id"]).read_records(sync_mode=None) + ) + + elif status in ["Cancelled", "Failed"]: + # Cancelled and failed exports fail the current sync. + raise Exception(status) + + elif status == "Completed": + return True + + sleep(self.poll_interval) + + raise Exception(f"Export timed out after {self.job_timeout / 60} minutes") + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + list_response = response.text.rstrip("\n").split("\n") + + headers = list_response[0].split(",") + + schema = self.get_json_schema()["properties"] + + for values in list_response[1:]: + record = { + headers[i]: format_value(value, schema[headers[i]]) + for i, value in enumerate(next(csv.reader([values], skipinitialspace=True))) + } + + if "attributes" in headers: + attributes_records = { + clean_string(key): format_value(value, schema[clean_string(key)]) + for key, value in json.loads(record["attributes"]).items() + } + record.update(attributes_records) + record.pop("attributes") + + yield record + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + self.sleep_till_export_completed(stream_slice) + return super().read_records(sync_mode, cursor_field, stream_slice, stream_state) + + +class MarketoExportCreate(MarketoStream): + """ + Provides functionality to create Marketo export. + Return list with dict, example: + [ + { + "exportId": "141bas21-146c-4a43-8c72-280sder596c34", + "format": "CSV", + "status": "Created", + "createdAt": "2021-09-01T10:09:39Z" + } + ] + """ + + http_method = "POST" + + def path(self, **kwargs) -> str: + return f"/bulk/v1/{self.stream_name}/export/create.json" + + def request_body_json(self, **kwargs) -> Optional[Mapping]: + params = {"format": "CSV"} + if self.param: + params.update(self.param) + + return params + + +class MarketoExportStart(MarketoStream): + """ + Provides functionality to start Marketo export. + Return list with dict, example: + [ + { + "exportId": "1689f995-1397-48b2-b88a-5eed1397299b", + "format": "CSV", + "status": "Queued", + "createdAt": "2021-09-01T10:00:50Z", + "queuedAt": "2021-09-01T10:01:07Z" + } + ] + """ + + http_method = "POST" + + def path(self, **kwargs) -> str: + return f"/bulk/v1/{self.stream_name}/export/{self.export_id}/enqueue.json" + + +class MarketoExportStatus(MarketoStream): + """ + Provides functionality to get status of Marketo export. + Return string with dict, example: "Completed" + """ + + def path(self, **kwargs) -> str: + return f"/bulk/v1/{self.stream_name}/export/{self.export_id}/status.json" + + def parse_response(self, response: requests.Response, **kwargs) -> List[str]: + return [response.json()[self.data_field][0]["status"]] + + +class Leads(MarketoExportBase): + """ + Return list of all leeds. + API Docs: http://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/ + """ + + cursor_field = "updatedAt" + + def __init__(self, config: Mapping[str, Any]): + super().__init__(config, "leads") + + @property + def stream_fields(self): + return list(self.get_json_schema()["properties"].keys()) + + +class Activities(MarketoExportBase): + """ + Base class for all the activities streams, + provides functionality for dynamically created classes as streams of data. + API Docs: http://developers.marketo.com/rest-api/bulk-extract/bulk-activity-extract/ + """ + + primary_key = "marketoGUID" + cursor_field = "activityDate" + + def __init__(self, config: Mapping[str, Any]): + super().__init__(config, "activities") + + @property + def stream_filter(self): + return {"activityTypeIds": [self.activity["id"]]} + + def get_json_schema(self) -> Mapping[str, Any]: + properties = { + "marketoGUID": {"type": ["null", "string"]}, + "leadId": {"type": ["null", "integer"]}, + "activityDate": {"type": ["null", "string"], "format": "date-time"}, + "activityTypeId": {"type": ["null", "integer"]}, + "campaignId": {"type": ["null", "integer"]}, + "attributes": {"type": ["null", "string"]}, + } + + if "primaryAttribute" in self.activity: + properties["primaryAttributeValue"] = {"type": ["null", "string"]} + properties["primaryAttributeName"] = {"type": ["null", "string"]} + properties["primaryAttributeValueId"] = {"type": ["null", "string"]} + + if "attributes" in self.activity: + for attr in self.activity["attributes"]: + attr_name = clean_string(attr["name"]) + + if attr["dataType"] in ["datetime", "date"]: + field_schema = {"type": "string", "format": "date-time"} + elif attr["dataType"] in ["integer", "percent", "score"]: + field_schema = {"type": "integer"} + elif attr["dataType"] in ["float", "currency"]: + field_schema = {"type": "number"} + elif attr["dataType"] == "boolean": + field_schema = {"type": "boolean"} + elif attr["dataType"] in STRING_TYPES: + field_schema = {"type": "string"} + elif attr["dataType"] in ["array"]: + field_schema = {"type": "array", "items": {"type": ["integer", "number", "string", "null"]}} + else: + field_schema = {"type": "string"} + + field_schema["type"] = [field_schema["type"], "null"] + + properties[attr_name] = field_schema + + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": ["null", "object"], + "additionalProperties": False, + "properties": properties, + } + + return schema + + +class ActivityTypes(MarketoStream): + """ + Return list of all activity types. + API Docs: https://developers.intercom.com/intercom-api-reference/reference#list-admins + """ + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return "/rest/v1/activities/types.json" + + +class Programs(IncrementalMarketoStream): + """ + Return list of all admins. + API Docs: http://developers.marketo.com/rest-api/assets/programs/#by_date_range + """ + + cursor_field = "updatedAt" + page_size = 200 + offset = 0 + + def path(self, **kwargs) -> str: + return f"/rest/asset/v1/{self.name}.json" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + data = response.json().get(self.data_field) + + if data: + self.offset += self.page_size + 1 + return {"offset": self.offset} + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + """ + Programs are queryable via their updatedAt time but require and + end date as well. As there is no max time range for the query, + query from the bookmark value until current. + """ + + params = super().request_params(stream_state, stream_slice, next_page_token) + params.update( + { + "maxReturn": self.page_size, + "earliestUpdatedAt": stream_slice["startAt"], + "latestUpdatedAt": stream_slice["endAt"], + } + ) + + return params + + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: + for record in super().parse_response(response, stream_state, **kwargs): + record["updatedAt"] = record["updatedAt"][:-5] + yield record + + +class Campaigns(IncrementalMarketoStream): + """ + Return list of all campaigns. + API Docs: http://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Campaigns/getCampaignsUsingGET + """ + + pass + + +class Lists(IncrementalMarketoStream): + """ + Return list of all lists. + API Docs: http://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Static_Lists/getListsUsingGET + """ + + pass + + +class MarketoAuthenticator(Oauth2Authenticator): + def __init__(self, config): + super().__init__( + token_refresh_endpoint=f"{config['base_url']}/identity/oauth/token", + client_id=config["client_id"], + client_secret=config["client_secret"], + refresh_token=None, + ) + + def get_refresh_request_params(self) -> Mapping[str, Any]: + payload: MutableMapping[str, Any] = { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + } + + return payload + + def refresh_access_token(self) -> Tuple[str, int]: + """ + Returns a tuple of (access_token, token_lifespan_in_seconds) + """ + try: + response = requests.request(method="GET", url=self.token_refresh_endpoint, params=self.get_refresh_request_params()) + response.raise_for_status() + response_json = response.json() + # print(response_json["access_token"]) + return response_json["access_token"], response_json["expires_in"] + except Exception as e: + raise Exception(f"Error while refreshing access token: {e}") from e + + +class SourceMarketo(AbstractSource): + """ + Source Marketo fetch data of personalized multi-channel programs and campaigns to prospects and customers. + """ + + def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + Testing connection availability for the connector by granting the credentials. + """ + + try: + url = f"{config['base_url']}/rest/v1/leads/describe" + + authenticator = MarketoAuthenticator(config) + + session = requests.get(url, headers=authenticator.get_auth_header()) + session.raise_for_status() + + return True, None + except requests.exceptions.RequestException as e: + return False, e + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + config["authenticator"] = MarketoAuthenticator(config) + + streams = [ActivityTypes(config), Campaigns(config), Leads(config), Lists(config), Programs(config)] + + for activity in ActivityTypes(config).read_records(sync_mode=None): + stream_name = f"activities_{clean_string(activity['name'])}" + + stream_class = type(stream_name, (Activities,), {"activity": activity}) + + # instantiate a stream with config + stream_instance = stream_class(config) + streams.append(stream_instance) + + return streams diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json b/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json new file mode 100644 index 0000000000000..acf51f5c0c6b5 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json @@ -0,0 +1,40 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/marketo", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Source Marketo Spec", + "type": "object", + "required": ["base_url", "client_id", "client_secret", "start_date"], + "additionalProperties": false, + "properties": { + "base_url": { + "type": "string", + "description": "Your Marketo Base URL. See the docs for info on how to obtain this.", + "examples": ["https://000-AAA-000.mktorest.com"], + "airbyte_secret": true + }, + "client_id": { + "type": "string", + "description": "Your Marketo client_id. See the docs for info on how to obtain this.", + "airbyte_secret": true + }, + "client_secret": { + "type": "string", + "description": "Your Marketo client secret. See the docs for info on how to obtain this.", + "airbyte_secret": true + }, + "start_date": { + "type": "string", + "description": "Data generated in Marketo after this date will be replicated. This date must be specified in the format YYYY-MM-DDT00:00:00Z.", + "examples": ["2020-09-25T00:00:00Z"], + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + }, + "window_in_days": { + "type": "integer", + "description": "The amount of days for each data-chunk begining from start_date. Bigger the value - faster the fetch. (Min=1, as for a Day; Max=30, as for a Month).", + "examples": [1, 5, 10, 15, 30], + "default": 30 + } + } + } +} diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py b/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py new file mode 100644 index 0000000000000..5847c0ffee6e1 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py @@ -0,0 +1,82 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from datetime import datetime + +STRING_TYPES = [ + "string", + "email", + "reference", + "url", + "phone", + "textarea", + "text", + "lead_function", +] + + +def to_datetime_str(date: datetime) -> str: + """ + Returns the formated datetime string. + :: Output example: '2021-07-15T0:0:0Z' FORMAT : "%Y-%m-%dT%H:%M:%SZ" + """ + return date.strftime("%Y-%m-%dT%H:%M:%SZ") + + +def clean_string(string: str) -> str: + abbreviations = ("URL", "GUID", "IP") + if any(map(lambda w: w in string, abbreviations)): + return string.lower().replace(" ", "_") + return "".join("_" + c.lower() if c.isupper() else c for c in string if c != " ").strip("_") + + +def format_value(value, schema): + if not isinstance(schema["type"], list): + field_type = [schema["type"]] + else: + field_type = schema["type"] + + if value in [None, "", "null"]: + return None + elif "integer" in field_type: + if isinstance(value, int): + return value + + # Custom Marketo percent type fields can have decimals, so we drop them + decimal_index = value.find(".") + if decimal_index > 0: + # singer.log_warning("Dropping decimal from integer type. Original Value: %s", value) + value = value[:decimal_index] + return int(value) + elif "string" in field_type: + return str(value) + elif "number" in field_type: + return float(value) + elif "boolean" in field_type: + if isinstance(value, bool): + return value + return value.lower() == "true" + + return value diff --git a/airbyte-integrations/connectors/source-marketo/unit_tests/__init__.py b/airbyte-integrations/connectors/source-marketo/unit_tests/__init__.py new file mode 100644 index 0000000000000..9db886e0930f0 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/unit_tests/__init__.py @@ -0,0 +1,23 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# diff --git a/airbyte-integrations/connectors/source-marketo/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-marketo/unit_tests/unit_test.py new file mode 100644 index 0000000000000..306bedb6f2b71 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/unit_tests/unit_test.py @@ -0,0 +1,60 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import pytest +from source_marketo.utils import clean_string, format_value + +test_data = [ + (1, {"type": "integer"}, int), + ("string", {"type": "string"}, str), + (True, {"type": ["boolean", "null"]}, bool), + (1, {"type": ["number", "null"]}, float), +] + + +@pytest.mark.parametrize("value,schema,expected_output_type", test_data) +def test_fromat_value(value, schema, expected_output_type): + test = format_value(value, schema) + + assert isinstance(test, expected_output_type) + + +test_data = [ + ("updatedAt", "updated_at"), + ("UpdatedAt", "updated_at"), + ("base URL", "base_url"), + ("UPdatedAt", "u_pdated_at"), + ("updated_at", "updated_at"), + (" updated_at ", "updated_at"), + ("updatedat", "updatedat"), + ("", ""), +] + + +@pytest.mark.parametrize("value,expected", test_data) +def test_clean_string(value, expected): + test = clean_string(value) + + assert test == expected diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 7fd37c06c4473..4259ef0e13e98 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -52,7 +52,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex |[Looker](./sources/looker.md)| Beta | |[Magento](./sources/magento.md)| Beta | |[Mailchimp](./sources/mailchimp.md)| Certified | -|[Marketo](./sources/marketo.md)| Certified | +|[Marketo](./sources/marketo.md)| Beta | |[Microsoft SQL Server \(MSSQL\)](./sources/mssql.md)| Certified | |[Microsoft Dynamics AX](./sources/microsoft-dynamics-ax.md)| Beta | |[Microsoft Dynamics Customer Engagement](./sources/microsoft-dynamics-customer-engagement.md)| Beta | diff --git a/docs/integrations/sources/marketo.md b/docs/integrations/sources/marketo.md index 04f58bba95372..3edd6211aae26 100644 --- a/docs/integrations/sources/marketo.md +++ b/docs/integrations/sources/marketo.md @@ -4,7 +4,7 @@ The Marketo source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. -This connector is based on the [Singer Marketo Tap](https://github.com/singer-io/tap-marketo). +This connector is based on the [Airbyte CDK](https://docs.airbyte.io/connector-development/cdk-python). ### Output schema @@ -31,10 +31,10 @@ This connector can be used to sync the following tables from Marketo: Feature -| Supported?\(Yes/No\) | Notes | | -| :--- | :--- | :--- | -| Full Refresh Sync | Yes | | -| Incremental - Append Sync | Yes | | +| Supported?\(Yes/No\) | Notes | +| :--- | :--- | +| Full Refresh Sync | Yes | +| Incremental - Append Sync | Yes | ### Performance considerations @@ -53,7 +53,7 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa * An Airbyte Marketo API-only user * A Marketo API Custom Service * Marketo Client ID & Client Secret -* Marketo REST API Base URLs +* Marketo Base URL ### Setup guide @@ -81,7 +81,7 @@ Make sure to follow the "**Credentials for API Access"** section in the Marketo #### Step 5: Obtain your Endpoint and Identity URLs provided by Marketo -Follow the [Marketo documentation for obtaining your base URL](https://developers.marketo.com/rest-api/base-url/). Specifically, copy your **Endpoint** and **Identity URLs** and keep them handy for use in the Airbyte UI. +Follow the [Marketo documentation for obtaining your base URL](https://developers.marketo.com/rest-api/base-url/). Specifically, copy your **Endpoint** without "/rest" and keep them handy for use in the Airbyte UI. We're almost there! Armed with your Endpoint & Identity URLs and your Client ID and Secret, head over to the Airbyte UI to setup Marketo as a source. @@ -91,4 +91,4 @@ We're almost there! Armed with your Endpoint & Identity URLs and your Client ID | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| `0.2.3` | 2021-07-06 | [4539](https://github.com/airbytehq/airbyte/pull/4539) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support | +| `0.1.0` | 2021-09-06 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Release Marketo CDK Connector| diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index 52422403f6a30..ec2ca5ae0e85a 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -78,6 +78,7 @@ write_standard_creds source-jira "$JIRA_INTEGRATION_TEST_CREDS" write_standard_creds source-klaviyo "$KLAVIYO_TEST_CREDS" write_standard_creds source-looker "$LOOKER_INTEGRATION_TEST_CREDS" write_standard_creds source-mailchimp "$MAILCHIMP_TEST_CREDS" +write_standard_creds source-marketo"$SOURCE_MARKETO_TEST_CREDS" write_standard_creds source-marketo-singer "$SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG" write_standard_creds source-microsoft-teams "$MICROSOFT_TEAMS_TEST_CREDS" write_standard_creds source-mixpanel "$MIXPANEL_INTEGRATION_TEST_CREDS" From 0b8642c9d082a8972b744fd5494136b4e22697bd Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Mon, 6 Sep 2021 20:08:48 +0300 Subject: [PATCH 2/7] Updated PR in docs --- airbyte-integrations/builds.md | 2 +- docs/integrations/sources/marketo.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index f58261de5afd8..3638fd56f7c3c 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -41,7 +41,7 @@ | Klaviyo | [![source-klaviyo](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-klaviyo%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-klaviyo) | | Kustomer | [![source-kustomer](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-kustomer%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-kustomer-singer) | | Mailchimp | [![source-mailchimp](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mailchimp%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mailchimp) | -| Marketo | [![source-marketo-singer](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-marketo-singer%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-marketo-singer) | +| Marketo | [![source-marketo](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-marketo%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-marketo) | | Microsoft SQL Server \(MSSQL\) | [![source-mssql](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mssql%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mssql) | | Microsoft Teams | [![source-microsoft-teams](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-microsoft-teams%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-microsoft-teams) | | Mixpanel | [![source-mixpanel](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mixpanel%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mixpanel) | diff --git a/docs/integrations/sources/marketo.md b/docs/integrations/sources/marketo.md index 3edd6211aae26..826542e3e9bd1 100644 --- a/docs/integrations/sources/marketo.md +++ b/docs/integrations/sources/marketo.md @@ -91,4 +91,4 @@ We're almost there! Armed with your Endpoint & Identity URLs and your Client ID | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| `0.1.0` | 2021-09-06 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Release Marketo CDK Connector| +| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector| From 21b7b7864ce4c5c188ed23f9bb22c19af0166bc1 Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Mon, 6 Sep 2021 20:11:21 +0300 Subject: [PATCH 3/7] Fix typo --- .../connectors/source-marketo/source_marketo/source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py index e57de9dca3e20..3cd6279fe4261 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py @@ -385,7 +385,7 @@ def get_json_schema(self) -> Mapping[str, Any]: class ActivityTypes(MarketoStream): """ Return list of all activity types. - API Docs: https://developers.intercom.com/intercom-api-reference/reference#list-admins + API Docs: http://developers.marketo.com/rest-api/lead-database/activities/#describe """ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: @@ -394,7 +394,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: class Programs(IncrementalMarketoStream): """ - Return list of all admins. + Return list of all programs. API Docs: http://developers.marketo.com/rest-api/assets/programs/#by_date_range """ From ac401d170146bf661c470194667881398f3fbc19 Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Mon, 6 Sep 2021 20:51:19 +0300 Subject: [PATCH 4/7] Add bootstrap file --- .../connectors/source-marketo/bootstrap.md | 31 +++++++++++++++++++ .../source-marketo/source_marketo/spec.json | 2 +- 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/source-marketo/bootstrap.md diff --git a/airbyte-integrations/connectors/source-marketo/bootstrap.md b/airbyte-integrations/connectors/source-marketo/bootstrap.md new file mode 100644 index 0000000000000..da150e9b5ace3 --- /dev/null +++ b/airbyte-integrations/connectors/source-marketo/bootstrap.md @@ -0,0 +1,31 @@ +## Core streams + +Marketo is a REST based API. Connector is implemented with [Airbyte CDK](https://docs.airbyte.io/connector-development/cdk-python). + +Connector has such core streams, and all of them except Activity_types support full refresh and incremental sync: +* [Activity\_types](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Activities/getAllActivityTypesUsingGET). +* [Campaigns](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Campaigns/getCampaignsUsingGET). +* [Lists](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Static_Lists/getListByIdUsingGET). +* [Programs](https://developers.marketo.com/rest-api/endpoint-reference/asset-endpoint-reference/#!/Programs/browseProgramsUsingGET). + + +## Bulk export streams + +Connector also has bulk export streams, which support incremental sync. + +* [Activities\_X](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Activities/getLeadActivitiesUsingGET). +* [Leads](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Leads/getLeadByIdUsingGET). + +To be able to pull export data you need to generate 3 separate requests. See [Marketo docs](https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/). + +* [First](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#/Bulk_Export_Leads/createExportLeadsUsingPOST) - to create a job + +* [Second](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#/Bulk_Export_Leads/enqueueExportLeadsUsingPOST) - to enqueue job + +* [Third](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Bulk_Export_Leads/getExportLeadsFileUsingGET) - to poll export data + +For get status of extracting see [Status](https://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Bulk_Export_Leads/getExportLeadsStatusUsingGET) - the status is only updated once every 60 seconds. Job timeout - 180 min. + +Connector uses `createdAt` and `updatedAt` config for initial reports sync depend on connector and current date as an end data. + +Connector has `window_in_days` config which allows set the amount of days for each data-chunk begining from start_date. Default: 30 days. Max: 30 days. diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json b/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json index acf51f5c0c6b5..585b81fe1c5f6 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json @@ -31,7 +31,7 @@ }, "window_in_days": { "type": "integer", - "description": "The amount of days for each data-chunk begining from start_date. Bigger the value - faster the fetch. (Min=1, as for a Day; Max=30, as for a Month).", + "description": "The amount of days for each data-chunk begining from start_date. (Min=1, as for a Day; Max=30, as for a Month).", "examples": [1, 5, 10, 15, 30], "default": 30 } From a99ad1826400df0a8fb2902389d44867fd685fc7 Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Tue, 21 Sep 2021 17:08:56 +0300 Subject: [PATCH 5/7] Update to review --- .../connectors/source-marketo/Dockerfile | 33 ++++++-- .../source-marketo/acceptance-test-config.yml | 18 ++--- ....py::TestBasicRead::test_read[inputs0].txt | 0 ...efresh::test_sequential_reads[inputs0].txt | 0 .../connectors/source-marketo/build.gradle | 5 -- .../integration_tests/invalid_config.json | 2 +- .../integration_tests/sample_config.json | 2 +- .../schemas/activity_types.json | 22 +++--- .../source_marketo/schemas/campaigns.json | 22 +++--- .../source_marketo/schemas/leads.json | 8 +- .../source_marketo/schemas/lists.json | 12 +-- .../source_marketo/schemas/programs.json | 12 +-- .../source-marketo/source_marketo/source.py | 76 +++++++++++-------- .../source-marketo/source_marketo/spec.json | 4 +- .../source-marketo/source_marketo/utils.py | 12 ++- 15 files changed, 134 insertions(+), 94 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt delete mode 100644 airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt diff --git a/airbyte-integrations/connectors/source-marketo/Dockerfile b/airbyte-integrations/connectors/source-marketo/Dockerfile index e9426b2decd80..03d0efdc52059 100644 --- a/airbyte-integrations/connectors/source-marketo/Dockerfile +++ b/airbyte-integrations/connectors/source-marketo/Dockerfile @@ -1,13 +1,34 @@ -FROM python:3.7-slim +FROM python:3.7.11-alpine3.14 as base -# Bash is installed for more convenient debugging. -RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base WORKDIR /airbyte/integration_code -COPY source_marketo ./source_marketo + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only COPY main.py ./ -COPY setup.py ./ -RUN pip install . +COPY source_marketo ./source_marketo ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] diff --git a/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml b/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml index 42a702315a868..e5dad4884f887 100644 --- a/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-marketo/acceptance-test-config.yml @@ -11,17 +11,17 @@ tests: status: "failed" discovery: - config_path: "secrets/config.json" -# basic_read: -# - config_path: "secrets/config.json" -# configured_catalog_path: "integration_tests/configured_catalog.json" -# empty_streams: [] -# timeout_seconds: 3600 + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + timeout_seconds: 3600 incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" future_state_path: "integration_tests/abnormal_state.json" timeout_seconds: 3600 -# full_refresh: -# - config_path: "secrets/config.json" -# configured_catalog_path: "integration_tests/configured_catalog.json" -# timeout_seconds: 3600 + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + timeout_seconds: 3600 diff --git a/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt b/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt b/airbyte-integrations/connectors/source-marketo/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/airbyte-integrations/connectors/source-marketo/build.gradle b/airbyte-integrations/connectors/source-marketo/build.gradle index 63759ef1c64df..8ce68796afd6b 100644 --- a/airbyte-integrations/connectors/source-marketo/build.gradle +++ b/airbyte-integrations/connectors/source-marketo/build.gradle @@ -7,8 +7,3 @@ plugins { airbytePython { moduleDirectory 'source_marketo' } - -dependencies { - implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) - implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) -} diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json index 416cd1af83c80..7497e3efaa8eb 100644 --- a/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/invalid_config.json @@ -1,7 +1,7 @@ { "client_id": "invalid_client_id", "client_secret": "invalid_client_secret", - "base_url": "invalid_base_url", + "domain_url": "invalid_domain_url", "start_date": "0000-00-00T00:00:00Z", "window_in_days": 2 } diff --git a/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json index 1beb13712a9b4..526966d7d36a8 100644 --- a/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json +++ b/airbyte-integrations/connectors/source-marketo/integration_tests/sample_config.json @@ -1,7 +1,7 @@ { "client_id": "client_id", "client_secret": "client_secret", - "base_url": "base_url", + "domain_url": "domain_url", "start_date": "0000-00-00T00:00:00Z", "window_in_days": 2 } diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json index dc114486d82d4..3c6a844618b2e 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/activity_types.json @@ -1,37 +1,37 @@ { - "type": "object", + "type": ["null", "object"], "additionalProperties": false, "properties": { "id": { - "type": "integer" + "type": ["null", "integer"] }, "name": { - "type": "string" + "type": ["null", "string"] }, "description": { - "type": ["string", "null"] + "type": ["null", "string"] }, "primaryAttribute": { - "type": ["object", "null"], + "type": ["null", "object"], "properties": { "name": { - "type": "string" + "type": ["null", "string"] }, "dataType": { - "type": "string" + "type": ["null", "string"] } } }, "attributes": { - "type": ["array", "null"], + "type": ["null", "array"], "items": { - "type": "object", + "type": ["null", "object"], "properties": { "name": { - "type": "string" + "type": ["null", "string"] }, "dataType": { - "type": "string" + "type": ["null", "string"] } } } diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json index 33ef4be032b89..cf3fe20350759 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/campaigns.json @@ -1,38 +1,38 @@ { - "type": "object", + "type": ["null", "object"], "additionalProperties": false, "properties": { "id": { - "type": "integer" + "type": ["null", "integer"] }, "createdAt": { - "type": "string", + "type": ["null", "string"], "format": "date-time" }, "updatedAt": { - "type": "string", + "type": ["null", "string"], "format": "date-time" }, "active": { - "type": ["boolean", "null"] + "type": ["null", "boolean"] }, "description": { - "type": ["string", "null"] + "type": ["null", "string"] }, "name": { - "type": "string" + "type": ["null", "string"] }, "programId": { - "type": ["integer", "null"] + "type": ["null", "integer"] }, "programName": { - "type": ["string", "null"] + "type": ["null", "string"] }, "type": { - "type": "string" + "type": ["null", "string"] }, "workspaceName": { - "type": ["string", "null"] + "type": ["null", "string"] } } } diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json index 566d75d85613d..eb1e2ca751e75 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/leads.json @@ -1,5 +1,5 @@ { - "type": "object", + "type": ["object", "null"], "additionalProperties": false, "properties": { "company": { @@ -48,7 +48,7 @@ "type": ["string", "null"] }, "id": { - "type": "integer" + "type": ["integer", "null"] }, "mktoName": { "type": ["string", "null"] @@ -215,11 +215,11 @@ "type": ["string", "null"] }, "createdAt": { - "type": "string", + "type": ["string", "null"], "format": "date-time" }, "updatedAt": { - "type": "string", + "type": ["string", "null"], "format": "date-time" }, "cookies": { diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json index d240c421a3c3f..9c8d983e46309 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/lists.json @@ -1,19 +1,19 @@ { - "type": "object", + "type": ["object", "null"], "additionalProperties": false, "properties": { "id": { - "type": "integer" + "type": ["integer", "null"] }, "name": { - "type": "string" + "type": ["string", "null"] }, "createdAt": { - "type": "string", + "type": ["string", "null"], "format": "date-time" }, "updatedAt": { - "type": "string", + "type": ["string", "null"], "format": "date-time" }, "description": { @@ -26,7 +26,7 @@ "type": ["string", "null"] }, "workspaceId": { - "type": "integer" + "type": ["integer", "null"] } } } diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json index d1c1be8181045..58071b52dcb4a 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/schemas/programs.json @@ -1,20 +1,20 @@ { - "type": "object", + "type": ["object", "null"], "additionalProperties": false, "properties": { "id": { - "type": "integer" + "type": ["integer", "null"] }, "createdAt": { - "type": "string", + "type": ["string", "null"], "format": "date-time" }, "updatedAt": { - "type": "string", + "type": ["string", "null"], "format": "date-time" }, "name": { - "type": "string" + "type": ["string", "null"] }, "description": { "type": ["null", "string"] @@ -35,7 +35,7 @@ "type": ["null", "string"] }, "folder": { - "type": "object", + "type": ["object", "null"], "properties": { "type": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py index 3cd6279fe4261..957a16138bd2f 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py @@ -50,7 +50,7 @@ def __init__(self, config: Mapping[str, Any], stream_name: str = None, param: Ma self.config = config self.start_date = config["start_date"] self.window_in_days = config["window_in_days"] - self._url_base = config["base_url"] + self._url_base = config["domain_url"] self.stream_name = stream_name self.param = param self.export_id = export_id @@ -68,12 +68,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, if next_page: return {"nextPageToken": next_page} - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: + def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = {"batchSize": self.page_size} if next_page_token: params.update(**next_page_token) @@ -107,7 +102,9 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: return { - self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, self.start_date)) + self.cursor_field: max( + latest_record.get(self.cursor_field, self.start_date), current_stream_state.get(self.cursor_field, self.start_date) + ) } def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: @@ -136,7 +133,9 @@ def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwa date_slices = [] while start_date <= end_date: + # the amount of days for each data-chunk begining from start_date end_date_slice = start_date.add(days=self.window_in_days) + date_slice = {"startAt": to_datetime_str(start_date), "endAt": to_datetime_str(end_date_slice)} date_slices.append(date_slice) @@ -150,7 +149,8 @@ class MarketoExportBase(IncrementalMarketoStream): Base class for all the streams which support bulk extract. """ - job_timeout = 60 * 180 + # Polling Job Status - https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/ + # The status is only updated once every 60 seconds poll_interval = 60 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: @@ -164,6 +164,19 @@ def stream_fields(self): def stream_filter(self): return {} + def create_export(self, param): + return next(MarketoExportCreate(self.config, stream_name=self.stream_name, param=param).read_records(sync_mode=None), {}) + + def start_export(self, stream_slice): + return next( + MarketoExportStart(self.config, stream_name=self.stream_name, export_id=stream_slice["id"]).read_records(sync_mode=None) + ) + + def get_export_status(self, stream_slice): + return next( + MarketoExportStatus(self.config, stream_name=self.stream_name, export_id=stream_slice["id"]).read_records(sync_mode=None) + ) + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return f"/bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json" @@ -175,25 +188,20 @@ def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwa param["fields"].extend(self.stream_fields) param["filter"].update(self.stream_filter) - export = next(MarketoExportCreate(self.config, stream_name=self.stream_name, param=param).read_records(sync_mode=None), {}) + export = self.create_export(param) date_slice["id"] = export["exportId"] return date_slices def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool: - timeout_time = pendulum.now().add(seconds=self.job_timeout) - while pendulum.now() < timeout_time: - status = next( - MarketoExportStatus(self.config, stream_name=self.stream_name, export_id=stream_slice["id"]).read_records(sync_mode=None) - ) + while True: + status = self.get_export_status(stream_slice) self.logger.info(f"Export {self.name} from {stream_slice['startAt']} to {stream_slice['endAt']} status is {status}") if status == "Created": # If the status is created, the export has been made but # not started, so enqueue the export. - next( - MarketoExportStart(self.config, stream_name=self.stream_name, export_id=stream_slice["id"]).read_records(sync_mode=None) - ) + self.start_export(stream_slice) elif status in ["Cancelled", "Failed"]: # Cancelled and failed exports fail the current sync. @@ -204,9 +212,16 @@ def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool: sleep(self.poll_interval) - raise Exception(f"Export timed out after {self.job_timeout / 60} minutes") - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + response.text example: + + firstName,lastName,email,cookies + Russell,Wilson,null,_mch-localhost-1536605780000-12105 + + :return an iterable containing each record in the response + """ + list_response = response.text.rstrip("\n").split("\n") headers = list_response[0].split(",") @@ -310,7 +325,7 @@ class Leads(MarketoExportBase): cursor_field = "updatedAt" def __init__(self, config: Mapping[str, Any]): - super().__init__(config, "leads") + super().__init__(config, self.name) @property def stream_fields(self): @@ -400,7 +415,10 @@ class Programs(IncrementalMarketoStream): cursor_field = "updatedAt" page_size = 200 - offset = 0 + + def __init__(self, config: Mapping[str, Any]): + super().__init__(config) + self.offset = 0 def path(self, **kwargs) -> str: return f"/rest/asset/v1/{self.name}.json" @@ -424,7 +442,7 @@ def request_params( query from the bookmark value until current. """ - params = super().request_params(stream_state, stream_slice, next_page_token) + params = super().request_params(next_page_token, stream_state=stream_state, stream_slice=stream_slice) params.update( { "maxReturn": self.page_size, @@ -437,7 +455,7 @@ def request_params( def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: for record in super().parse_response(response, stream_state, **kwargs): - record["updatedAt"] = record["updatedAt"][:-5] + record["updatedAt"] = record["updatedAt"][:-5] # delete +00:00 part from the end of updatedAt yield record @@ -447,8 +465,6 @@ class Campaigns(IncrementalMarketoStream): API Docs: http://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Campaigns/getCampaignsUsingGET """ - pass - class Lists(IncrementalMarketoStream): """ @@ -456,13 +472,11 @@ class Lists(IncrementalMarketoStream): API Docs: http://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Static_Lists/getListsUsingGET """ - pass - class MarketoAuthenticator(Oauth2Authenticator): def __init__(self, config): super().__init__( - token_refresh_endpoint=f"{config['base_url']}/identity/oauth/token", + token_refresh_endpoint=f"{config['domain_url']}/identity/oauth/token", client_id=config["client_id"], client_secret=config["client_secret"], refresh_token=None, @@ -485,7 +499,6 @@ def refresh_access_token(self) -> Tuple[str, int]: response = requests.request(method="GET", url=self.token_refresh_endpoint, params=self.get_refresh_request_params()) response.raise_for_status() response_json = response.json() - # print(response_json["access_token"]) return response_json["access_token"], response_json["expires_in"] except Exception as e: raise Exception(f"Error while refreshing access token: {e}") from e @@ -502,7 +515,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: """ try: - url = f"{config['base_url']}/rest/v1/leads/describe" + url = f"{config['domain_url']}/rest/v1/leads/describe" authenticator = MarketoAuthenticator(config) @@ -518,6 +531,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams = [ActivityTypes(config), Campaigns(config), Leads(config), Lists(config), Programs(config)] + # create dynamically activities by activity type id for activity in ActivityTypes(config).read_records(sync_mode=None): stream_name = f"activities_{clean_string(activity['name'])}" diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json b/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json index 585b81fe1c5f6..5ffeeeb37e58d 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/spec.json @@ -4,10 +4,10 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Source Marketo Spec", "type": "object", - "required": ["base_url", "client_id", "client_secret", "start_date"], + "required": ["domain_url", "client_id", "client_secret", "start_date"], "additionalProperties": false, "properties": { - "base_url": { + "domain_url": { "type": "string", "description": "Your Marketo Base URL. See the docs for info on how to obtain this.", "examples": ["https://000-AAA-000.mktorest.com"], diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py b/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py index 5847c0ffee6e1..0771066fc7081 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/utils.py @@ -46,6 +46,17 @@ def to_datetime_str(date: datetime) -> str: def clean_string(string: str) -> str: + """ + input -> output + "updatedAt" -> "updated_at" + "UpdatedAt" -> "updated_at" + "base URL" -> "base_url" + "UPdatedAt" -> "u_pdated_at" + "updated_at" -> "updated_at" + " updated_at " -> "updated_at" + "updatedat" -> "updatedat" + """ + abbreviations = ("URL", "GUID", "IP") if any(map(lambda w: w in string, abbreviations)): return string.lower().replace(" ", "_") @@ -67,7 +78,6 @@ def format_value(value, schema): # Custom Marketo percent type fields can have decimals, so we drop them decimal_index = value.find(".") if decimal_index > 0: - # singer.log_warning("Dropping decimal from integer type. Original Value: %s", value) value = value[:decimal_index] return int(value) elif "string" in field_type: From 7e9fcb15f30fa4a41b5d8b2a735e760225ef77dd Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Thu, 23 Sep 2021 13:18:44 +0300 Subject: [PATCH 6/7] Fix typo --- tools/bin/ci_credentials.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index ec2ca5ae0e85a..b4021dcd7cdc2 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -78,7 +78,7 @@ write_standard_creds source-jira "$JIRA_INTEGRATION_TEST_CREDS" write_standard_creds source-klaviyo "$KLAVIYO_TEST_CREDS" write_standard_creds source-looker "$LOOKER_INTEGRATION_TEST_CREDS" write_standard_creds source-mailchimp "$MAILCHIMP_TEST_CREDS" -write_standard_creds source-marketo"$SOURCE_MARKETO_TEST_CREDS" +write_standard_creds source-marketo "$SOURCE_MARKETO_TEST_CREDS" write_standard_creds source-marketo-singer "$SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG" write_standard_creds source-microsoft-teams "$MICROSOFT_TEAMS_TEST_CREDS" write_standard_creds source-mixpanel "$MIXPANEL_INTEGRATION_TEST_CREDS" From 47a9d040fa54d1b94cd607c4ee9bf87c267d0764 Mon Sep 17 00:00:00 2001 From: lazebnyi Date: Thu, 23 Sep 2021 14:57:28 +0300 Subject: [PATCH 7/7] Add gcc to docker --- airbyte-integrations/connectors/source-marketo/Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-marketo/Dockerfile b/airbyte-integrations/connectors/source-marketo/Dockerfile index 03d0efdc52059..45142d197afb4 100644 --- a/airbyte-integrations/connectors/source-marketo/Dockerfile +++ b/airbyte-integrations/connectors/source-marketo/Dockerfile @@ -7,7 +7,8 @@ WORKDIR /airbyte/integration_code # upgrade pip to the latest version RUN apk --no-cache upgrade \ && pip install --upgrade pip \ - && apk --no-cache add tzdata + && apk --no-cache add tzdata \ + && apk --no-cache add build-base COPY setup.py ./ # install necessary packages to a temporary folder