diff --git a/airbyte-integrations/connectors/source-mailchimp/.coveragerc b/airbyte-integrations/connectors/source-mailchimp/.coveragerc new file mode 100644 index 0000000000000..e8793cf04be38 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/.coveragerc @@ -0,0 +1,3 @@ +[run] +omit = + source_mailchimp/run.py \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-mailchimp/acceptance-test-config.yml b/airbyte-integrations/connectors/source-mailchimp/acceptance-test-config.yml index c5aeffc15774c..8ded4d12f3e70 100644 --- a/airbyte-integrations/connectors/source-mailchimp/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-mailchimp/acceptance-test-config.yml @@ -12,12 +12,9 @@ acceptance_tests: # for auth with oauth2 token - config_path: "secrets/config_oauth.json" status: "succeed" - - config_path: "integration_tests/invalid_config.json" - status: "failed" - config_path: "integration_tests/invalid_config_apikey.json" status: "failed" - - config_path: "integration_tests/invalid_config_oauth.json" - status: "failed" + timeout_seconds: 300 discovery: tests: # for auth with API token @@ -33,24 +30,15 @@ acceptance_tests: empty_streams: - name: "automations" bypass_reason: "Cannot seed in free sandbox account, need to upgrade to paid account." - - config_path: "secrets/config_oauth.json" - expect_records: - path: "integration_tests/expected_records.jsonl" - fail_on_extra_columns: false - empty_streams: - - name: "automations" - bypass_reason: "Cannot seed in free sandbox account, need to upgrade to paid account." incremental: tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" future_state: - future_state_path: "integration_tests/state.json" + future_state_path: "integration_tests/abnormal_state.json" # Email activities stream has working campaigns with email newsletters. # Due to this sequential_reads test could be failed. full_refresh: tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json" - - config_path: "secrets/config_oauth.json" - configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json" diff --git a/airbyte-integrations/connectors/source-mailchimp/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-mailchimp/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..80f92d1400fa3 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/integration_tests/abnormal_state.json @@ -0,0 +1,370 @@ +[ + { + "type": "STREAM", + "stream": { + "stream_state": { + "create_time": "2220-11-23T05:42:11+00:00" + }, + "stream_descriptor": { + "name": "campaigns" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "date_created": "2220-09-25T04:47:31+00:00" + }, + "stream_descriptor": { + "name": "lists" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "create_time": "2220-11-23T05:42:11+00:00" + }, + "stream_descriptor": { + "name": "automations" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "states": [ + { + "partition": { + "id": "324b8a398e", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + }, + { + "partition": { + "id": "3cbed9a0fc", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + }, + { + "partition": { + "id": "2aa901afd0", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + }, + { + "partition": { + "id": "e974db8443", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + }, + { + "partition": { + "id": "a79651273b", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + }, + { + "partition": { + "id": "d983b83b95", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + }, + { + "partition": { + "id": "7847cdaeff", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2230-11-23T05:42:10+0000" + } + } + ] + }, + "stream_descriptor": { + "name": "email_activity" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "states": [ + { + "partition": { + "id": "16d6ec4ffc", + "parent_slice": {} + }, + "cursor": { + "last_changed": "2230-02-26T05:42:10.000000Z" + } + } + ] + }, + "stream_descriptor": { + "name": "list_members" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "send_time": "2230-02-26T05:42:10+00:00" + }, + "stream_descriptor": { + "name": "reports" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "states": [ + { + "partition": { + "id": 13506120, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 13506124, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 13506128, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 13506132, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 13506136, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 14351124, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 14351128, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 14351488, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 14351504, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + }, + { + "partition": { + "id": 14351532, + "parent_slice": { + "id": "16d6ec4ffc", + "parent_slice": {} + } + }, + "cursor": { + "last_changed": "2222-12-27T08:34:39+0000" + } + } + ] + }, + "stream_descriptor": { + "name": "segment_members" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "states": [ + { + "partition": { + "id": "16d6ec4ffc", + "parent_slice": {} + }, + "cursor": { + "updated_at": "2230-02-26T05:42:10Z" + } + } + ] + }, + "stream_descriptor": { + "name": "segments" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "states": [ + { + "partition": { + "id": "324b8a398e", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + }, + { + "partition": { + "id": "3cbed9a0fc", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + }, + { + "partition": { + "id": "2aa901afd0", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + }, + { + "partition": { + "id": "e974db8443", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + }, + { + "partition": { + "id": "a79651273b", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + }, + { + "partition": { + "id": "d983b83b95", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + }, + { + "partition": { + "id": "7847cdaeff", + "parent_slice": {} + }, + "cursor": { + "timestamp": "2231-09-26T05:42:10+0000" + } + } + ] + }, + "stream_descriptor": { + "name": "unsubscribes" + } + } + } +] diff --git a/airbyte-integrations/connectors/source-mailchimp/integration_tests/state.json b/airbyte-integrations/connectors/source-mailchimp/integration_tests/state.json deleted file mode 100644 index 26b656926fd56..0000000000000 --- a/airbyte-integrations/connectors/source-mailchimp/integration_tests/state.json +++ /dev/null @@ -1,80 +0,0 @@ -[ - { - "type": "STREAM", - "stream": { - "stream_state": { "create_time": "2220-11-23T05:42:11+00:00" }, - "stream_descriptor": { "name": "campaigns" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { "date_created": "2220-09-25T04:47:31+00:00" }, - "stream_descriptor": { "name": "lists" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { "create_time": "2220-11-23T05:42:11+00:00" }, - "stream_descriptor": { "name": "automations" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { - "7847cdaeff": { "timestamp": "2230-11-23T05:42:10+00:00" } - }, - "stream_descriptor": { "name": "email_activity" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { - "16d6ec4ffc": { "last_changed": "2230-02-26T05:42:10+00:00" } - }, - "stream_descriptor": { "name": "list_members" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { "send_time": "2230-02-26T05:42:10+00:00" }, - "stream_descriptor": { "name": "reports" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { - "13506120": { "last_changed": "2222-12-27T08:34:39+00:00" }, - "13506136": { "last_changed": "2222-12-27T08:34:39+00:00" }, - "14351124": { "last_changed": "2222-12-27T08:34:39+00:00" }, - "14351504": { "last_changed": "2222-12-27T07:56:47+00:00" }, - "14351128": { "last_changed": "2222-12-27T08:34:39+00:00" }, - "13506132": { "last_changed": "2222-12-27T08:34:39+00:00" } - }, - "stream_descriptor": { "name": "segment_members" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { - "16d6ec4ffc": { "updated_at": "2230-02-26T05:42:10+00:00" } - }, - "stream_descriptor": { "name": "segments" } - } - }, - { - "type": "STREAM", - "stream": { - "stream_state": { - "7847cdaeff": { "timestamp": "2231-09-26T05:42:10+00:00" } - }, - "stream_descriptor": { "name": "unsubscribes" } - } - } -] diff --git a/airbyte-integrations/connectors/source-mailchimp/metadata.yaml b/airbyte-integrations/connectors/source-mailchimp/metadata.yaml index eb22bc1ef31e9..f54f394d51566 100644 --- a/airbyte-integrations/connectors/source-mailchimp/metadata.yaml +++ b/airbyte-integrations/connectors/source-mailchimp/metadata.yaml @@ -5,12 +5,13 @@ data: allowedHosts: hosts: - "*.api.mailchimp.com" + - "login.mailchimp.com" connectorBuildOptions: - baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c + baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9 connectorSubtype: api connectorType: source definitionId: b03a9f3e-22a5-11eb-adc1-0242ac120002 - dockerImageTag: 1.2.0 + dockerImageTag: 2.0.0 dockerRepository: airbyte/source-mailchimp documentationUrl: https://docs.airbyte.com/integrations/sources/mailchimp githubIssueLabel: source-mailchimp @@ -28,6 +29,12 @@ data: enabled: true releases: breakingChanges: + 2.0.0: + message: The source Mailchimp connector is being migrated from the Python CDK to our declarative low-code CDK. Due to changes in primary key for streams `Segment Members` and `List Members`, this migration constitutes a breaking change. After updating, please reset your source before resuming syncs. For more information, see our migration documentation for source Mailchimp. + upgradeDeadline: "2024-04-10" + scopedImpact: + - scopeType: stream + impactedScopes: ["segment_members", "list_members"] 1.0.0: message: Version 1.0.0 introduces schema changes to all incremental streams. @@ -44,5 +51,5 @@ data: supportLevel: certified tags: - language:python - - cdk:python + - cdk:low-code metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/source-mailchimp/poetry.lock b/airbyte-integrations/connectors/source-mailchimp/poetry.lock index af61bd6aa380e..58ab3dcc25b53 100644 --- a/airbyte-integrations/connectors/source-mailchimp/poetry.lock +++ b/airbyte-integrations/connectors/source-mailchimp/poetry.lock @@ -1,14 +1,14 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "airbyte-cdk" -version = "0.77.2" +version = "0.78.1" description = "A framework for writing Airbyte Connectors." optional = false python-versions = "<4.0,>=3.9" files = [ - {file = "airbyte_cdk-0.77.2-py3-none-any.whl", hash = "sha256:6dffbe0c4b3454a5cdd20525b4f1e9cfef2e80c005b6b30473fc5bf6f75af64e"}, - {file = "airbyte_cdk-0.77.2.tar.gz", hash = "sha256:84aeb27862a18e135c7bc3a5dfc363037665d428e7495e8824673f853adcca70"}, + {file = "airbyte_cdk-0.78.1-py3-none-any.whl", hash = "sha256:73dfc03e55a7107bf28b5bbc4e43572d448c60e9b34368d22cf48b6536aa2263"}, + {file = "airbyte_cdk-0.78.1.tar.gz", hash = "sha256:700e5526ae29db1e453b3def8682726f7d8aa653ee2f3056488d0a484f055133"}, ] [package.dependencies] @@ -300,6 +300,20 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "freezegun" +version = "1.4.0" +description = "Let your Python tests travel through time" +optional = false +python-versions = ">=3.7" +files = [ + {file = "freezegun-1.4.0-py3-none-any.whl", hash = "sha256:55e0fc3c84ebf0a96a5aa23ff8b53d70246479e9a68863f1fcac5a3e52f19dd6"}, + {file = "freezegun-1.4.0.tar.gz", hash = "sha256:10939b0ba0ff5adaecf3b06a5c2f73071d9678e507c5eaedb23c761d56ac774b"}, +] + +[package.dependencies] +python-dateutil = ">=2.7" + [[package]] name = "genson" version = "1.2.2" @@ -837,13 +851,13 @@ yaml = ["pyyaml (>=6.0.1)"] [[package]] name = "requests-mock" -version = "1.12.0" +version = "1.12.1" description = "Mock out responses from the requests package" optional = false -python-versions = "*" +python-versions = ">=3.5" files = [ - {file = "requests-mock-1.12.0.tar.gz", hash = "sha256:4e34f2a2752f0b78397fb414526605d95fcdeab021ac1f26d18960e7eb41f6a8"}, - {file = "requests_mock-1.12.0-py2.py3-none-any.whl", hash = "sha256:4f6fdf956de568e0bac99eee4ad96b391c602e614cc0ad33e7f5c72edd699e70"}, + {file = "requests-mock-1.12.1.tar.gz", hash = "sha256:e9e12e333b525156e82a3c852f22016b9158220d2f47454de9cae8a77d371401"}, + {file = "requests_mock-1.12.1-py2.py3-none-any.whl", hash = "sha256:b1e37054004cdd5e56c84454cc7df12b25f90f382159087f4b6915aaeef39563"}, ] [package.dependencies] @@ -852,24 +866,6 @@ requests = ">=2.22,<3" [package.extras] fixture = ["fixtures"] -[[package]] -name = "responses" -version = "0.19.0" -description = "A utility library for mocking out the `requests` Python library." -optional = false -python-versions = ">=3.7" -files = [ - {file = "responses-0.19.0-py3-none-any.whl", hash = "sha256:53354b5de163aa2074312c71d8ebccb8bd1ab336cff7053abb75e84dc5637abe"}, - {file = "responses-0.19.0.tar.gz", hash = "sha256:3fc29c3117e14136b833a0a6d4e7f1217c6301bf08b6086db468e12f1e3290e2"}, -] - -[package.dependencies] -requests = ">=2.0,<3.0" -urllib3 = ">=1.25.10" - -[package.extras] -tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-localserver", "types-mock", "types-requests"] - [[package]] name = "setuptools" version = "69.2.0" @@ -1046,4 +1042,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9,<3.12" -content-hash = "fe4e36826d87d9a7d36cd58f7f2ede200bffa315a42b37cf046fc495a689984c" +content-hash = "987a9fd3716b6001482423ffd138cfe7a77609236390f1a48a686daebf28ac68" diff --git a/airbyte-integrations/connectors/source-mailchimp/pyproject.toml b/airbyte-integrations/connectors/source-mailchimp/pyproject.toml index c3a68b066b60c..f5d046778126d 100644 --- a/airbyte-integrations/connectors/source-mailchimp/pyproject.toml +++ b/airbyte-integrations/connectors/source-mailchimp/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "1.2.0" +version = "2.0.0" name = "source-mailchimp" description = "Source implementation for Mailchimp." authors = [ "Airbyte ",] @@ -25,5 +25,5 @@ source-mailchimp = "source_mailchimp.run:run" [tool.poetry.group.dev.dependencies] pytest-mock = "^3.6.1" -responses = "^0.19.0" requests-mock = "^1.9.3" +freezegun = "^1.4.0" diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/components.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/components.py new file mode 100644 index 0000000000000..b1f1c2733159c --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/components.py @@ -0,0 +1,55 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar +from typing import Any, List, Mapping, Optional + +import pendulum +import requests +from airbyte_cdk.sources.declarative.extractors import DpathExtractor +from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState + + +class MailChimpRecordFilter(RecordFilter): + """ + Filter applied on a list of Records. + """ + + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self.parameters = parameters + + def filter_records( + self, + records: List[Mapping[str, Any]], + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> List[Mapping[str, Any]]: + current_state = [x for x in stream_state.get("states", []) if x["partition"]["id"] == stream_slice.partition["id"]] + cursor_value = self.get_filter_date(self.config.get("start_date"), current_state) + return [record for record in records if record[self.parameters["cursor_field"]] > cursor_value] if cursor_value else records + + def get_filter_date(self, start_date: str, state_value: list) -> str: + """ + Calculate the filter date to pass in the request parameters by comparing the start_date + with the value of state obtained from the stream_slice. + If only one value exists, use it by default. Otherwise, return None. + If no filter_date is provided, the API will fetch all available records. + """ + + start_date_parsed = pendulum.parse(start_date).to_iso8601_string() if start_date else None + state_date_parsed = ( + pendulum.parse(state_value[0]["cursor"][self.parameters["cursor_field"]]).to_iso8601_string() if state_value else None + ) + + # Return the max of the two dates if both are present. Otherwise return whichever is present, or None. + if start_date_parsed or state_date_parsed: + return max(filter(None, [start_date_parsed, state_date_parsed]), default=None) + + +class MailChimpRecordExtractorEmailActivity(DpathExtractor): + def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]: + records = super().extract_records(response=response) + return [{**record, **activity_item} for record in records for activity_item in record.pop("activity", [])] diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/config_migrations.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/config_migrations.py new file mode 100644 index 0000000000000..621edd5763f11 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/config_migrations.py @@ -0,0 +1,96 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import logging +from typing import Any, List, Mapping + +import requests +from airbyte_cdk.config_observation import create_connector_config_control_message +from airbyte_cdk.entrypoint import AirbyteEntrypoint +from airbyte_cdk.sources import Source +from airbyte_cdk.utils import AirbyteTracedException +from airbyte_protocol.models import FailureType + +logger = logging.getLogger("airbyte_logger") + + +class MigrateDataCenter: + """ + This class stands for migrating the config at runtime, + Set data_center property in config based on credential type. + """ + + @classmethod + def get_data_center_location(cls, config: Mapping[str, Any]) -> Mapping[str, Any]: + if config.get("credentials", {}).get("auth_type") == "apikey": + data_center = config["credentials"]["apikey"].split("-").pop() + else: + data_center = cls.get_oauth_data_center(config["credentials"]["access_token"]) + config["data_center"] = data_center + return config + + @staticmethod + def get_oauth_data_center(access_token: str) -> str: + """ + Every Mailchimp API request must be sent to a specific data center. + The data center is already embedded in API keys, but not OAuth access tokens. + This method retrieves the data center for OAuth credentials. + """ + response = requests.get("https://login.mailchimp.com/oauth2/metadata", headers={"Authorization": "OAuth {}".format(access_token)}) + + # Requests to this endpoint will return a 200 status code even if the access token is invalid. + error = response.json().get("error") + if error == "invalid_token": + raise AirbyteTracedException( + failure_type=FailureType.config_error, + internal_message=error, + message="The access token you provided was invalid. Please check your credentials and try again.", + ) + return response.json()["dc"] + + @classmethod + def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Modifies the configuration and then saves it back to the source. + + Args: + - config_path (str): The path where the configuration is stored. + - source (Source): The data source. + - config (Mapping[str, Any]): The current configuration. + + Returns: + - Mapping[str, Any]: The updated configuration. + """ + migrated_config = cls.get_data_center_location(config) + source.write_config(migrated_config, config_path) + return migrated_config + + @classmethod + def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None: + """ + Emits the control messages related to configuration migration. + + Args: + - migrated_config (Mapping[str, Any]): The migrated configuration. + """ + print(create_connector_config_control_message(migrated_config).json(exclude_unset=True)) + + @classmethod + def migrate(cls, args: List[str], source: Source) -> None: + """ + Orchestrates the configuration migration process. + + It first checks if the `--config` argument is provided, and if so, + determines whether migration is needed, and then performs the migration + if required. + + Args: + - args (List[str]): List of command-line arguments. + - source (Source): The data source. + """ + config_path = AirbyteEntrypoint(source).extract_config(args) + if config_path: + config = source.read_config(config_path) + cls.emit_control_message(cls.modify_and_save(config_path, source, config)) diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml new file mode 100644 index 0000000000000..43c331439f71e --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml @@ -0,0 +1,367 @@ +version: 0.52.0 +type: DeclarativeSource + +check: + type: CheckStream + stream_names: + - "campaigns" + +definitions: + bearer_authenticator: + type: BearerAuthenticator + api_token: "{{ config['credentials']['access_token'] }}" + basic_authenticator: + type: BasicHttpAuthenticator + username: "anystring" + password: "{{ config.get('apikey') or config['credentials']['apikey'] }}" + + transformer_remove_empty_fields: + type: RemoveFields + field_pointers: + - ["**"] + condition: "{{ property|string == '' }}" + + retriever: + type: SimpleRetriever + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["{{ parameters.get('data_field') }}"] + paginator: + type: "DefaultPaginator" + pagination_strategy: + type: "OffsetIncrement" + page_size: 1000 + page_size_option: + type: "RequestOption" + inject_into: "request_parameter" + field_name: "count" + page_token_option: + type: "RequestOption" + inject_into: "request_parameter" + field_name: "offset" + requester: + url_base: https://{{ config['data_center'] }}.api.mailchimp.com/3.0/ + http_method: GET + authenticator: + type: SelectiveAuthenticator + authenticator_selection_path: ["credentials", "auth_type"] + authenticators: + oauth2.0: "#/definitions/bearer_authenticator" + apikey: "#/definitions/basic_authenticator" + request_parameters: + exclude_fields: "{{ parameters.get('data_field') }}._links" + + base_stream: + retriever: + $ref: "#/definitions/retriever" + + base_incremental_stream: + retriever: + $ref: "#/definitions/retriever" + requester: + $ref: "#/definitions/retriever/requester" + request_parameters: + sort_field: "{{ parameters['cursor_field'] }}" + sort_dir: "ASC" + exclude_fields: "{{ parameters.get('data_field') }}._links" + transformations: + - "#/definitions/transformer_remove_empty_fields" + incremental_sync: + type: DatetimeBasedCursor + cursor_datetime_formats: + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + cursor_field: "{{ parameters['cursor_field'] }}" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + lookback_window: PT0.1S + start_time_option: + inject_into: request_parameter + field_name: "since_{{ parameters['cursor_field'] }}" + type: RequestOption + end_time_option: + inject_into: request_parameter + field_name: "before_{{ parameters['cursor_field'] }}" + type: RequestOption + end_datetime: + type: MinMaxDatetime + datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%S.%fZ') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + lookback_window: PT1S + + automations_stream: + type: DeclarativeStream + $ref: "#/definitions/base_incremental_stream" + $parameters: + name: "automations" + primary_key: "id" + path: "automations" + data_field: "automations" + cursor_field: "create_time" + + campaigns_stream: + type: DeclarativeStream + $ref: "#/definitions/base_incremental_stream" + $parameters: + name: "campaigns" + primary_key: "id" + path: "/campaigns" + data_field: "campaigns" + cursor_field: "create_time" + + list_members_stream: + type: DeclarativeStream + $ref: "#/definitions/base_incremental_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/lists_stream" + parent_key: id + partition_field: id + state_migrations: + - type: LegacyToPerPartitionStateMigration + $parameters: + name: "list_members" + primary_key: ["id", "list_id"] + path: "/lists/{{ stream_slice.id }}/members" + data_field: "members" + cursor_field: "last_changed" + + lists_stream: + type: DeclarativeStream + $ref: "#/definitions/base_incremental_stream" + $parameters: + name: "lists" + primary_key: "id" + path: "lists" + data_field: "lists" + cursor_field: "date_created" + + tags_stream: + type: DeclarativeStream + $ref: "#/definitions/base_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/lists_stream" + parent_key: id + partition_field: id + transformations: + - type: AddFields + fields: + - path: ["list_id"] + value: "{{ stream_slice.id }}" + $parameters: + name: "tags" + primary_key: "id" + path: "lists/{{ stream_slice.id }}/tag-search" + data_field: "tags" + + interest_categories_stream: + type: DeclarativeStream + $ref: "#/definitions/base_stream" + transformations: + - type: AddFields + fields: + - path: ["list_id"] + value: "{{ stream_slice.id }}" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/lists_stream" + parent_key: id + partition_field: id + $parameters: + name: "interest_categories" + primary_key: "id" + path: "lists/{{ stream_slice.id }}/interest-categories" + data_field: "categories" + + interests_stream: + type: DeclarativeStream + $ref: "#/definitions/base_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/interest_categories_stream" + parent_key: id + partition_field: id + $parameters: + name: "interests" + primary_key: "id" + path: "lists/{{ stream_slice.parent_slice.id }}/interest-categories/{{ stream_slice.id }}/interests" + data_field: "interests" + + reports_stream: + type: DeclarativeStream + $ref: "#/definitions/base_incremental_stream" + $parameters: + name: "reports" + primary_key: "id" + path: "reports" + data_field: "reports" + cursor_field: "send_time" + + segments_stream: + type: DeclarativeStream + $ref: "#/definitions/base_incremental_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/lists_stream" + parent_key: id + partition_field: id + state_migrations: + - type: LegacyToPerPartitionStateMigration + $parameters: + name: "segments" + primary_key: "id" + path: "/lists/{{ stream_slice.id }}/segments" + data_field: "segments" + cursor_field: "updated_at" + + segment_members_stream: + type: DeclarativeStream + $ref: "#/definitions/base_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/segments_stream" + parent_key: id + partition_field: id + record_selector: + $ref: "#/definitions/retriever/record_selector" + record_filter: + type: CustomRecordFilter + class_name: source_mailchimp.components.MailChimpRecordFilter + incremental_sync: + type: DatetimeBasedCursor + cursor_datetime_formats: + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%Y-%m-%dT%H:%M:%S%z" + cursor_field: "{{ parameters['cursor_field'] }}" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + lookback_window: PT0.1S + transformations: + - type: AddFields + fields: + - path: ["segment_id"] + value: "{{ stream_slice.id }}" + - "#/definitions/transformer_remove_empty_fields" + state_migrations: + - type: LegacyToPerPartitionStateMigration + $parameters: + name: "segment_members" + primary_key: ["id", "segment_id"] + path: "/lists/{{ stream_slice.parent_slice.id }}/segments/{{ stream_slice.id }}/members" + data_field: "members" + cursor_field: "last_changed" + + unsubscribes_stream: + type: DeclarativeStream + $ref: "#/definitions/base_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/campaigns_stream" + parent_key: id + partition_field: id + record_selector: + $ref: "#/definitions/retriever/record_selector" + record_filter: + type: CustomRecordFilter + class_name: source_mailchimp.components.MailChimpRecordFilter + incremental_sync: + type: DatetimeBasedCursor + cursor_datetime_formats: + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%Y-%m-%dT%H:%M:%S%z" + cursor_field: "{{ parameters['cursor_field'] }}" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + lookback_window: PT0.1S + state_migrations: + - type: LegacyToPerPartitionStateMigration + $parameters: + name: "unsubscribes" + primary_key: ["campaign_id", "email_id", "timestamp"] + path: "/reports/{{ stream_slice.id }}/unsubscribed" + data_field: "unsubscribes" + cursor_field: "timestamp" + + email_activity_stream: + type: DeclarativeStream + $ref: "#/definitions/base_stream" + retriever: + $ref: "#/definitions/retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/definitions/campaigns_stream" + parent_key: id + partition_field: id + record_selector: + type: RecordSelector + extractor: + type: CustomRecordExtractor + class_name: source_mailchimp.components.MailChimpRecordExtractorEmailActivity + field_path: ["{{ parameters.get('data_field') }}"] + incremental_sync: + type: DatetimeBasedCursor + datetime_format: "%Y-%m-%dT%H:%M:%S%z" + cursor_field: "{{ parameters['cursor_field'] }}" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S%z" + lookback_window: PT1S + start_time_option: + inject_into: request_parameter + field_name: "since" + type: RequestOption + lookback_window: PT0.1S + state_migrations: + - type: LegacyToPerPartitionStateMigration + $parameters: + name: "email_activity" + primary_key: ["timestamp", "email_id", "action"] + path: "/reports/{{ stream_slice.id }}/email-activity" + data_field: "emails" + cursor_field: "timestamp" + +streams: + - "#/definitions/automations_stream" + - "#/definitions/campaigns_stream" + - "#/definitions/email_activity_stream" + - "#/definitions/lists_stream" + - "#/definitions/list_members_stream" + - "#/definitions/tags_stream" + - "#/definitions/interest_categories_stream" + - "#/definitions/interests_stream" + - "#/definitions/reports_stream" + - "#/definitions/segments_stream" + - "#/definitions/segment_members_stream" + - "#/definitions/unsubscribes_stream" diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/run.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/run.py index 15226fdfeebd0..c4f1b04c5c4fb 100644 --- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/run.py +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/run.py @@ -7,8 +7,10 @@ from airbyte_cdk.entrypoint import launch from source_mailchimp import SourceMailchimp +from source_mailchimp.config_migrations import MigrateDataCenter def run(): source = SourceMailchimp() + MigrateDataCenter.migrate(sys.argv[1:], source) launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py index 0edf00993e5fb..ba650f4cc6529 100644 --- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py @@ -3,147 +3,9 @@ # -import base64 -import re -from typing import Any, List, Mapping, Tuple +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -import pendulum -import requests -from airbyte_cdk import AirbyteLogger -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from pendulum.parsing.exceptions import ParserError -from requests.auth import AuthBase -from .streams import ( - Automations, - Campaigns, - EmailActivity, - InterestCategories, - Interests, - ListMembers, - Lists, - Reports, - SegmentMembers, - Segments, - Tags, - Unsubscribes, -) - - -class MailChimpAuthenticator: - @staticmethod - def get_oauth_data_center(access_token: str) -> str: - """ - Every Mailchimp API request must be sent to a specific data center. - The data center is already embedded in API keys, but not OAuth access tokens. - This method retrieves the data center for OAuth credentials. - """ - try: - response = requests.get( - "https://login.mailchimp.com/oauth2/metadata", headers={"Authorization": "OAuth {}".format(access_token)} - ) - - # Requests to this endpoint will return a 200 status code even if the access token is invalid. - error = response.json().get("error") - if error == "invalid_token": - raise ValueError("The access token you provided was invalid. Please check your credentials and try again.") - return response.json()["dc"] - - # Handle any other exceptions that may occur. - except Exception as e: - raise Exception(f"An error occured while retrieving the data center for your account. \n {repr(e)}") - - def get_auth(self, config: Mapping[str, Any]) -> AuthBase: - authorization = config.get("credentials", {}) - auth_type = authorization.get("auth_type") - if auth_type == "apikey" or not authorization: - # API keys have the format -. - # See https://mailchimp.com/developer/marketing/docs/fundamentals/#api-structure - apikey = authorization.get("apikey") or config.get("apikey") - if not apikey: - raise Exception("Please provide a valid API key for authentication.") - auth_string = f"anystring:{apikey}".encode("utf8") - b64_encoded = base64.b64encode(auth_string).decode("utf8") - auth = TokenAuthenticator(token=b64_encoded, auth_method="Basic") - auth.data_center = apikey.split("-").pop() - - elif auth_type == "oauth2.0": - access_token = authorization["access_token"] - auth = TokenAuthenticator(token=access_token, auth_method="Bearer") - auth.data_center = self.get_oauth_data_center(access_token) - - else: - raise Exception(f"Invalid auth type: {auth_type}") - - return auth - - -class SourceMailchimp(AbstractSource): - def _validate_start_date(self, config: Mapping[str, Any]): - start_date = config.get("start_date") - - if start_date: - pattern = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z") - if not pattern.match(start_date): # Compare against the pattern descriptor. - return "Please check the format of the start date against the pattern descriptor." - - try: # Handle invalid dates. - parsed_start_date = pendulum.parse(start_date) - except ParserError: - return "The provided start date is not a valid date. Please check the date you input and try again." - - if parsed_start_date > pendulum.now("UTC"): # Handle future start date. - return "The start date cannot be greater than the current date." - - return None - - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: - # First, check for a valid start date if it is provided - start_date_validation_error = self._validate_start_date(config) - if start_date_validation_error: - return False, start_date_validation_error - - try: - authenticator = MailChimpAuthenticator().get_auth(config) - response = requests.get( - f"https://{authenticator.data_center}.api.mailchimp.com/3.0/ping", headers=authenticator.get_auth_header() - ) - - # A successful response will return a simple JSON object with a single key: health_status. - # Otherwise, errors are returned as a JSON object with keys: - # {type, title, status, detail, instance} - - if not response.json().get("health_status"): - error_title = response.json().get("title", "Unknown Error") - error_details = response.json().get("details", "An unknown error occurred. Please verify your credentials and try again.") - return False, f"Encountered an error while connecting to Mailchimp. Type: {error_title}. Details: {error_details}" - return True, None - - # Handle any other exceptions that may occur. - except Exception as e: - return False, repr(e) - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - authenticator = MailChimpAuthenticator().get_auth(config) - campaign_id = config.get("campaign_id") - start_date = config.get("start_date") - - lists = Lists(authenticator=authenticator, start_date=start_date) - interest_categories = InterestCategories(authenticator=authenticator, parent=lists) - - return [ - Automations(authenticator=authenticator, start_date=start_date), - Campaigns(authenticator=authenticator, start_date=start_date), - EmailActivity(authenticator=authenticator, start_date=start_date, campaign_id=campaign_id), - interest_categories, - Interests(authenticator=authenticator, parent=interest_categories), - lists, - ListMembers(authenticator=authenticator, start_date=start_date), - Reports(authenticator=authenticator, start_date=start_date), - SegmentMembers(authenticator=authenticator, start_date=start_date), - Segments(authenticator=authenticator, start_date=start_date), - Tags(authenticator=authenticator, parent=lists), - Unsubscribes(authenticator=authenticator, start_date=start_date, campaign_id=campaign_id), - ] +class SourceMailchimp(YamlDeclarativeSource): + def __init__(self): + super().__init__(**{"path_to_yaml": "manifest.yaml"}) diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/spec.json b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/spec.json index f88649faa1533..11fb4936ae92a 100644 --- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/spec.json +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/spec.json @@ -70,9 +70,10 @@ "pattern_descriptor": "YYYY-MM-DDTHH:MM:SS.000Z", "examples": ["2020-01-01T00:00:00.000Z"] }, - "campaign_id": { + "data_center": { + "title": "DataCenter", + "description": "Technical fields used to identify datacenter to send request to", "type": "string", - "title": "ID of a campaign to sync email activities", "airbyte_hidden": true } } diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/streams.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/streams.py deleted file mode 100644 index 158eaf1e8b47d..0000000000000 --- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/streams.py +++ /dev/null @@ -1,518 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import logging -import math -from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional - -import pendulum -import requests -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.core import StreamData -from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream - -logger = logging.getLogger("airbyte") - - -class MailChimpStream(HttpStream, ABC): - primary_key = "id" - page_size = 1000 - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.current_offset = 0 - self.data_center = kwargs["authenticator"].data_center - - @property - def url_base(self) -> str: - return f"https://{self.data_center}.api.mailchimp.com/3.0/" - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - decoded_response = response.json() - api_data = decoded_response[self.data_field] - if len(api_data) < self.page_size: - self.current_offset = 0 - return None - else: - self.current_offset += self.page_size - return {"offset": self.current_offset} - - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - - # The ._links field is returned by most Mailchimp endpoints and contains non-relevant schema metadata. - params = {"count": self.page_size, "exclude_fields": f"{self.data_field}._links"} - - # Handle pagination by inserting the next page's token in the request parameters - if next_page_token: - params.update(next_page_token) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - yield from response_json[self.data_field] - - @property - @abstractmethod - def data_field(self) -> str: - """The response entry that contains useful data""" - pass - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[StreamData]: - try: - yield from super().read_records( - sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state - ) - except requests.exceptions.JSONDecodeError: - logger.error(f"Unknown error while reading stream {self.name}. Response cannot be read properly. ") - - -class IncrementalMailChimpStream(MailChimpStream, ABC): - state_checkpoint_interval = math.inf - - def __init__(self, **kwargs): - self.start_date = kwargs.pop("start_date", None) - super().__init__(**kwargs) - - @property - @abstractmethod - def cursor_field(self) -> str: - """ - Defining a cursor field indicates that a stream is incremental, so any incremental stream must extend this class - and define a cursor field. - """ - pass - - @property - def filter_field(self): - return f"since_{self.cursor_field}" - - @property - def sort_field(self): - return self.cursor_field - - def filter_empty_fields(self, element: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Many Mailchimp endpoints return empty strings instead of null values. - This causes validation errors on datetime columns, so for safety, we need to check for empty strings and set their value to None/null. - This method recursively traverses each element in a record and replaces any "" values with None, based on three conditions: - - 1. If the element is a dictionary, apply the method recursively to each value in the dictionary. - 2. If the element is a list, apply the method recursively to each item in the list. - 3. If the element is a string, check if it is an empty string. If so, replace it with None. - """ - - if isinstance(element, dict): - element = {k: self.filter_empty_fields(v) if v != "" else None for k, v in element.items()} - elif isinstance(element, list): - element = [self.filter_empty_fields(v) for v in element] - return element - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object - and returning an updated state object. - """ - latest_state = latest_record.get(self.cursor_field) - current_state = current_stream_state.get(self.cursor_field) or latest_state - return {self.cursor_field: max(latest_state, current_state)} - - def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - slice_ = {} - stream_state = stream_state or {} - cursor_value = self.get_filter_date(self.start_date, stream_state.get(self.cursor_field)) - if cursor_value: - slice_[self.filter_field] = cursor_value - yield slice_ - - @staticmethod - def get_filter_date(start_date: str, state_date: str) -> str: - """ - Calculate the filter date to pass in the request parameters by comparing the start_date - with the value of state obtained from the stream_slice. - If only one value exists, use it by default. Otherwise, return None. - If no filter_date is provided, the API will fetch all available records. - """ - - start_date_parsed = pendulum.parse(start_date).to_iso8601_string() if start_date else None - state_date_parsed = pendulum.parse(state_date).to_iso8601_string() if state_date else None - - # Return the max of the two dates if both are present. Otherwise return whichever is present, or None. - if start_date_parsed or state_date_parsed: - return max(filter(None, [start_date_parsed, state_date_parsed]), default=None) - - def filter_old_records(self, records: Iterable, filter_date) -> Iterable: - """ - Filters out records with older cursor_values than the filter_date. - This can be used to enforce the filter for incremental streams that do not support sorting/filtering via query params. - """ - for record in records: - record_cursor_value = record.get(self.cursor_field) - if not filter_date or record_cursor_value >= filter_date: - yield record - - def request_params(self, stream_state=None, stream_slice=None, **kwargs): - stream_state = stream_state or {} - stream_slice = stream_slice or {} - params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs) - default_params = {"sort_field": self.sort_field, "sort_dir": "ASC", **stream_slice} - params.update(default_params) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response = super().parse_response(response, **kwargs) - for record in response: - yield self.filter_empty_fields(record) - - -class MailChimpListSubStream(IncrementalMailChimpStream): - """ - Base class for incremental Mailchimp streams that are children of the Lists stream. - """ - - def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - stream_state = stream_state or {} - parent = Lists(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) - for parent_record in parent: - slice = {"list_id": parent_record["id"]} - cursor_value = self.get_filter_date(self.start_date, stream_state.get(parent_record["id"], {}).get(self.cursor_field)) - if cursor_value: - slice[self.filter_field] = cursor_value - yield slice - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - list_id = stream_slice.get("list_id") - return f"lists/{list_id}/{self.data_field}" - - def request_params(self, stream_state=None, stream_slice=None, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs) - - # Get the current state value for this list_id, if it exists - # Then, use the value in state to filter the request - current_slice = stream_slice.get("list_id") - filter_date = stream_state.get(current_slice) - if filter_date: - params[self.filter_field] = filter_date.get(self.cursor_field) - return params - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - current_stream_state = current_stream_state or {} - list_id = latest_record.get("list_id") - latest_cursor_value = latest_record.get(self.cursor_field) - - # Get the current state value for this list, if it exists - list_state = current_stream_state.get(list_id, {}) - current_cursor_value = list_state.get(self.cursor_field, latest_cursor_value) - - # Update the cursor value and set it in state - updated_cursor_value = max(current_cursor_value, latest_cursor_value) - current_stream_state[list_id] = {self.cursor_field: updated_cursor_value} - - return current_stream_state - - -class Lists(IncrementalMailChimpStream): - cursor_field = "date_created" - data_field = "lists" - - def path(self, **kwargs) -> str: - return "lists" - - -class Campaigns(IncrementalMailChimpStream): - cursor_field = "create_time" - data_field = "campaigns" - - def path(self, **kwargs) -> str: - return "campaigns" - - -class Automations(IncrementalMailChimpStream): - """Doc Link: https://mailchimp.com/developer/marketing/api/automation/get-automation-info/""" - - cursor_field = "create_time" - data_field = "automations" - - def path(self, **kwargs) -> str: - return "automations" - - -class EmailActivity(IncrementalMailChimpStream): - cursor_field = "timestamp" - filter_field = "since" - sort_field = "create_time" - data_field = "emails" - primary_key = ["timestamp", "email_id", "action"] - - def __init__(self, campaign_id: Optional[str] = None, **kwargs): - super().__init__(**kwargs) - self.campaign_id = campaign_id - - def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - stream_state = stream_state or {} - if self.campaign_id: - # this is a workaround to speed up SATs and enable incremental tests - campaigns = [{"id": self.campaign_id}] - else: - campaigns = Campaigns(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) - for campaign in campaigns: - slice_ = {"campaign_id": campaign["id"]} - state_value = stream_state.get(campaign["id"], {}).get(self.cursor_field) - cursor_value = self.get_filter_date(self.start_date, state_value) - if cursor_value: - slice_[self.filter_field] = cursor_value - yield slice_ - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - campaign_id = stream_slice["campaign_id"] - return f"reports/{campaign_id}/email-activity" - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Return the latest state by comparing the campaign_id and cursor value in the latest record with the stream's most recent state object - and returning an updated state object. - """ - campaign_id = latest_record.get("campaign_id") - latest_cursor_value = latest_record.get(self.cursor_field) - current_stream_state = current_stream_state or {} - current_state = current_stream_state.get(campaign_id) if current_stream_state else None - if current_state: - current_state = current_state.get(self.cursor_field) - current_state_value = current_state or latest_cursor_value - max_value = max(current_state_value, latest_cursor_value) - new_value = {self.cursor_field: max_value} - - current_stream_state[campaign_id] = new_value - return current_stream_state - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - try: - response_json = response.json() - except requests.exceptions.JSONDecodeError: - logger.error(f"Response returned with {response.status_code=}, {response.content=}") - response_json = {} - # transform before save - # [{'campaign_id', 'list_id', 'list_is_active', 'email_id', 'email_address', 'activity[array[object]]', '_links'}] -> - # -> [[{'campaign_id', 'list_id', 'list_is_active', 'email_id', 'email_address', '**activity[i]', '_links'}, ...]] - data = response_json.get(self.data_field, []) - for item in data: - for activity_item in item.pop("activity", []): - yield {**item, **activity_item} - - -class InterestCategories(MailChimpStream, HttpSubStream): - """ - Get information about interest categories for a specific list. - Docs link: https://mailchimp.com/developer/marketing/api/interest-categories/list-interest-categories/ - """ - - data_field = "categories" - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - """ - Get the list_id from the parent stream slice and use it to construct the path. - """ - list_id = stream_slice.get("parent").get("id") - return f"lists/{list_id}/interest-categories" - - -class Interests(MailChimpStream, HttpSubStream): - """ - Get a list of interests for a specific interest category. - Docs link: https://mailchimp.com/developer/marketing/api/interests/list-interests-in-category/ - """ - - data_field = "interests" - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - """ - Get the list_id from the parent stream slice and use it to construct the path. - """ - list_id = stream_slice.get("parent").get("list_id") - category_id = stream_slice.get("parent").get("id") - return f"lists/{list_id}/interest-categories/{category_id}/interests" - - -class ListMembers(MailChimpListSubStream): - """ - Get information about members in a specific Mailchimp list. - Docs link: https://mailchimp.com/developer/marketing/api/list-members/list-members-info/ - """ - - cursor_field = "last_changed" - data_field = "members" - - -class Reports(IncrementalMailChimpStream): - cursor_field = "send_time" - data_field = "reports" - - def path(self, **kwargs) -> str: - return "reports" - - -class SegmentMembers(MailChimpListSubStream): - """ - Get information about members in a specific segment. - Docs link: https://mailchimp.com/developer/marketing/api/list-segment-members/list-members-in-segment/ - """ - - cursor_field = "last_changed" - data_field = "members" - - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - """ - Each slice consists of a list_id and segment_id pair - """ - segments_slices = Segments(authenticator=self.authenticator).stream_slices(sync_mode=SyncMode.full_refresh) - - for slice in segments_slices: - segment_records = Segments(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice) - - for segment in segment_records: - yield {"list_id": segment["list_id"], "segment_id": segment["id"]} - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - list_id = stream_slice.get("list_id") - segment_id = stream_slice.get("segment_id") - return f"lists/{list_id}/segments/{segment_id}/members" - - def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice, **kwargs) -> Iterable[Mapping]: - """ - The SegmentMembers endpoint does not support sorting or filtering, - so we need to apply our own filtering logic before reading. - The foreign key "segment_id" is also added to each record before being read. - """ - response = super().parse_response(response, **kwargs) - - # Calculate the filter date to compare all records against in this slice - slice_cursor_value = stream_state.get(str(stream_slice.get("segment_id")), {}).get(self.cursor_field) - filter_date = self.get_filter_date(self.start_date, slice_cursor_value) - - for record in self.filter_old_records(response, filter_date): - # Add the segment_id foreign_key to each record - record["segment_id"] = stream_slice.get("segment_id") - yield record - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - current_stream_state = current_stream_state or {} - segment_id = str(latest_record.get("segment_id")) - latest_cursor_value = latest_record.get(self.cursor_field) - - # Get the current state value for this list, if it exists - segment_state = current_stream_state.get(segment_id, {}) - current_cursor_value = segment_state.get(self.cursor_field, latest_cursor_value) - - # Update the cursor value and set it in state - updated_cursor_value = max(current_cursor_value, latest_cursor_value) - current_stream_state[segment_id] = {self.cursor_field: updated_cursor_value} - return current_stream_state - - -class Segments(MailChimpListSubStream): - """ - Get information about all available segments for a specific list. - Docs link: https://mailchimp.com/developer/marketing/api/list-segments/list-segments/ - """ - - cursor_field = "updated_at" - data_field = "segments" - - -class Tags(MailChimpStream, HttpSubStream): - """ - Get information about tags for a specific list. - Docs link: https://mailchimp.com/developer/marketing/api/list-tags/list-tags-for-list/ - """ - - data_field = "tags" - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - list_id = stream_slice.get("parent").get("id") - return f"lists/{list_id}/tag-search" - - def parse_response(self, response: requests.Response, stream_slice, **kwargs) -> Iterable[Mapping]: - """ - Tags do not reference parent_ids, so we need to add the list_id to each record. - """ - response = super().parse_response(response, **kwargs) - - for record in response: - record["list_id"] = stream_slice.get("parent").get("id") - yield record - - -class Unsubscribes(IncrementalMailChimpStream): - """ - List of members who have unsubscribed from a specific campaign. - Docs link: https://mailchimp.com/developer/marketing/api/unsub-reports/list-unsubscribed-members/ - """ - - cursor_field = "timestamp" - data_field = "unsubscribes" - # There is no unique identifier for unsubscribes, so we use a composite key - # consisting of the campaign_id, email_id, and timestamp. - primary_key = ["campaign_id", "email_id", "timestamp"] - - def __init__(self, campaign_id: Optional[str] = None, **kwargs): - super().__init__(**kwargs) - self.campaign_id = campaign_id - - def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - - if self.campaign_id: - # Similar to EmailActivity stream, this is a workaround to speed up SATs - # and enable incremental tests by reading from a single campaign - campaigns = [{"id": self.campaign_id}] - else: - campaigns = Campaigns(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) - for campaign in campaigns: - yield {"campaign_id": campaign["id"]} - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - campaign_id = stream_slice.get("campaign_id") - return f"reports/{campaign_id}/unsubscribed" - - def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice, **kwargs) -> Iterable[Mapping]: - """ - The Unsubscribes endpoint does not support sorting or filtering, - so we need to apply our own filtering logic before reading. - """ - - response = super().parse_response(response, **kwargs) - - slice_cursor_value = stream_state.get(stream_slice.get("campaign_id", {}), {}).get(self.cursor_field) - filter_date = self.get_filter_date(self.start_date, slice_cursor_value) - yield from self.filter_old_records(response, filter_date) - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - current_stream_state = current_stream_state or {} - campaign_id = latest_record.get("campaign_id") - latest_cursor_value = latest_record.get(self.cursor_field) - - # Get the current state value for this campaign, if it exists - campaign_state = current_stream_state.get(campaign_id, {}) - current_cursor_value = campaign_state.get(self.cursor_field, latest_cursor_value) - - # Update the cursor value and set it in state - updated_cursor_value = max(current_cursor_value, latest_cursor_value) - current_stream_state[campaign_id] = {self.cursor_field: updated_cursor_value} - return current_stream_state diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/conftest.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/conftest.py index 5305f0dadab45..c387004a5110b 100644 --- a/airbyte-integrations/connectors/source-mailchimp/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/conftest.py @@ -3,8 +3,6 @@ # from pytest import fixture -from source_mailchimp.source import MailChimpAuthenticator -from source_mailchimp.streams import Campaigns, Unsubscribes @fixture(name="data_center") @@ -42,36 +40,3 @@ def apikey_config_fixture(data_center): @fixture(name="wrong_config") def wrong_config_fixture(): return {"credentials": {"auth_type": "not auth_type"}} - - -@fixture(name="auth") -def authenticator_fixture(apikey_config): - return MailChimpAuthenticator().get_auth(apikey_config) - - -@fixture(name="campaigns_stream") -def campaigns_stream_fixture(auth): - return Campaigns(authenticator=auth) - - -@fixture(name="unsubscribes_stream") -def unsubscribes_stream_fixture(auth): - return Unsubscribes(authenticator=auth) - - -@fixture(name="mock_campaigns_response") -def mock_campaigns_response_fixture(): - return [ - {"id": "campaign_1", "web_id": 1, "type": "regular", "create_time": "2022-01-01T00:00:00Z"}, - {"id": "campaign_2", "web_id": 2, "type": "plaintext", "create_time": "2022-01-02T00:00:00Z"}, - {"id": "campaign_3", "web_id": 3, "type": "variate", "create_time": "2022-01-03T00:00:00Z"}, - ] - - -@fixture(name="mock_unsubscribes_state") -def mock_unsubscribes_state_fixture(): - return { - "campaign_1": {"timestamp": "2022-01-01T00:00:00Z"}, - "campaign_2": {"timestamp": "2022-01-02T00:00:00Z"}, - "campaign_3": {"timestamp": "2022-01-03T00:00:00Z"}, - } diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/__init__.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/config.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/config.py new file mode 100644 index 0000000000000..7e363fea96775 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/config.py @@ -0,0 +1,16 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +from datetime import datetime +from typing import Any, Dict + + +class ConfigBuilder: + def __init__(self) -> None: + self._config: Dict[str, Any] = {"credentials": {"auth_type": "apikey", "apikey": "Mailchimp_token-us10"}, "data_center": "us10"} + + def with_start_date(self, start_datetime: datetime) -> "ConfigBuilder": + self._config["start_date"] = start_datetime.isoformat()[:-3] + "Z" + return self + + def build(self) -> Dict[str, Any]: + return self._config diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/test_automations.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/test_automations.py new file mode 100644 index 0000000000000..602562aa2db46 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/integration/test_automations.py @@ -0,0 +1,119 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +import datetime +import json +from unittest import TestCase + +import freezegun +from airbyte_cdk.models import SyncMode +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import find_template +from airbyte_cdk.test.state_builder import StateBuilder +from source_mailchimp import SourceMailchimp + +from .config import ConfigBuilder + +_CONFIG = ConfigBuilder().with_start_date(datetime.datetime(2023, 1, 1, 0, 0, 0, 1000)).build() + + +def _create_catalog(sync_mode: SyncMode = SyncMode.full_refresh): + return CatalogBuilder().with_stream(name="automations", sync_mode=sync_mode).build() + + +@freezegun.freeze_time("2023-01-31T23:59:59.001000Z") +class AutomationsTest(TestCase): + def setUp(self) -> None: + """Base setup for all tests. Enter test mocker.""" + + self.r_mock = HttpMocker() + self.r_mock.__enter__() + + def teardown(self): + """Stops and resets HttpMocker instance.""" + self.r_mock.__exit__() + + def test_read_full_refresh_no_pagination(self): + """Ensure http integration and record extraction""" + self.r_mock.get( + HttpRequest( + url="https://us10.api.mailchimp.com/3.0/automations", + query_params={ + "sort_field": "create_time", + "sort_dir": "ASC", + "exclude_fields": "automations._links", + "count": 1000, + "since_create_time": "2022-12-31T23:59:59.001000Z", + "before_create_time": "2023-01-31T23:59:59.001000Z", + }, + ), + HttpResponse(json.dumps(find_template("automations", __file__)), 200), + ) + + source = SourceMailchimp() + actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) + + assert len(actual_messages.records) == 1 + + def test_full_refresh_with_pagination(self): + """Ensure pagination""" + self.r_mock.get( + HttpRequest( + url="https://us10.api.mailchimp.com/3.0/automations", + query_params={ + "sort_field": "create_time", + "sort_dir": "ASC", + "exclude_fields": "automations._links", + "count": 1000, + "since_create_time": "2022-12-31T23:59:59.001000Z", + "before_create_time": "2023-01-31T23:59:59.001000Z", + }, + ), + HttpResponse(json.dumps({"automations": find_template("automations", __file__)["automations"] * 1002}), 200), + ) + self.r_mock.get( + HttpRequest( + url="https://us10.api.mailchimp.com/3.0/automations", + query_params={ + "sort_field": "create_time", + "sort_dir": "ASC", + "exclude_fields": "automations._links", + "count": 1000, + "offset": 1002, + "since_create_time": "2022-12-31T23:59:59.001000Z", + "before_create_time": "2023-01-31T23:59:59.001000Z", + }, + ), + HttpResponse(json.dumps(find_template("automations", __file__)), 200), + ) + source = SourceMailchimp() + actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) + + assert len(actual_messages.records) == 1003 + + def test_when_read_incrementally_then_emit_state_message(self): + """Ensure incremental sync emits correct stream state message""" + + self.r_mock.get( + HttpRequest( + url="https://us10.api.mailchimp.com/3.0/automations", + query_params={ + "sort_field": "create_time", + "sort_dir": "ASC", + "exclude_fields": "automations._links", + "count": 1000, + "since_create_time": "2022-12-31T23:59:59.001000Z", + "before_create_time": "2023-01-31T23:59:59.001000Z", + }, + ), + HttpResponse(json.dumps(find_template("automations", __file__)), 200), + ) + + source = SourceMailchimp() + actual_messages = read( + source, + config=_CONFIG, + catalog=_create_catalog(sync_mode=SyncMode.incremental), + state=StateBuilder().with_stream_state("automations", {"create_time": "2220-11-23T05:42:11+00:00"}).build(), + ) + actual_messages.state_messages[0].state.stream.stream_state == {"create_time": "2220-11-23T05:42:11+00:00"} diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/resource/http/response/automations.json b/airbyte-integrations/connectors/source-mailchimp/unit_tests/resource/http/response/automations.json new file mode 100644 index 0000000000000..3be7ec7ee9726 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/resource/http/response/automations.json @@ -0,0 +1,68 @@ +{ + "automations": [ + { + "id": "string", + "create_time": "2019-08-24T14:15:22Z", + "start_time": "2019-08-24T14:15:22Z", + "status": "save", + "emails_sent": 0, + "recipients": { + "list_id": "string", + "list_is_active": true, + "list_name": "string", + "segment_opts": { + "saved_segment_id": 0, + "match": "any", + "conditions": [null] + }, + "store_id": "1a2df69xxx" + }, + "settings": { + "title": "string", + "from_name": "string", + "reply_to": "string", + "use_conversation": true, + "to_name": "string", + "authenticate": true, + "auto_footer": true, + "inline_css": true + }, + "tracking": { + "opens": true, + "html_clicks": true, + "text_clicks": true, + "goal_tracking": true, + "ecomm360": true, + "google_analytics": "string", + "clicktale": "string", + "salesforce": { + "campaign": true, + "notes": true + }, + "capsule": { + "notes": true + } + }, + "trigger_settings": { + "workflow_type": "abandonedBrowse", + "workflow_title": "string", + "runtime": { + "days": ["sunday"], + "hours": { + "type": "send_asap" + } + }, + "workflow_emails_count": 0 + }, + "report_summary": { + "opens": 0, + "unique_opens": 0, + "open_rate": 0, + "clicks": 0, + "subscriber_clicks": 0, + "click_rate": 0 + } + } + ], + "total_items": 2 +} diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_component_custom_email_activity_extractor.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_component_custom_email_activity_extractor.py new file mode 100644 index 0000000000000..20332dd740813 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_component_custom_email_activity_extractor.py @@ -0,0 +1,66 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import json + +import requests +from airbyte_cdk.sources.declarative.decoders import JsonDecoder +from source_mailchimp.components import MailChimpRecordExtractorEmailActivity + + +def test_email_activity_extractor(): + decoder = JsonDecoder(parameters={}) + field_path = ["emails"] + config = {"response_override": "stop_if_you_see_me"} + extractor = MailChimpRecordExtractorEmailActivity(field_path=field_path, decoder=decoder, config=config, parameters={}) + + body = { + "emails": [ + { + "campaign_id": "string", + "list_id": "string", + "list_is_active": True, + "email_id": "string", + "email_address": "AirbyteMailchimpUser@gmail.com", + "activity": [ + {"action": "close", "type": "string", "timestamp": "2019-08-24T14:15:22Z", "url": "string", "ip": "string"}, + {"action": "open", "type": "string", "timestamp": "2019-08-24T14:15:22Z", "url": "string", "ip": "string"}, + ], + } + ], + "campaign_id": "string", + "total_items": 0, + } + response = requests.Response() + response._content = json.dumps(body).encode("utf-8") + + expected_records = [ + { + "action": "close", + "campaign_id": "string", + "email_address": "AirbyteMailchimpUser@gmail.com", + "email_id": "string", + "ip": "string", + "list_id": "string", + "list_is_active": True, + "timestamp": "2019-08-24T14:15:22Z", + "type": "string", + "url": "string", + }, + { + "action": "open", + "campaign_id": "string", + "email_address": "AirbyteMailchimpUser@gmail.com", + "email_id": "string", + "ip": "string", + "list_id": "string", + "list_is_active": True, + "timestamp": "2019-08-24T14:15:22Z", + "type": "string", + "url": "string", + }, + ] + + assert extractor.extract_records(response=response) == expected_records diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_component_custom_filter.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_component_custom_filter.py new file mode 100644 index 0000000000000..e4454a92f8772 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_component_custom_filter.py @@ -0,0 +1,80 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import pytest +from airbyte_cdk.sources.declarative.types import StreamSlice +from source_mailchimp.components import MailChimpRecordFilter + + +@pytest.mark.parametrize( + ["config", "stream_state", "len_expected_records"], + [ + [ + {"start_date": "2020-02-16T17:30:00.000Z"}, + { + "states": [ + { + "partition": { + "id": "7847cdaeff", + "parent_slice": {"end_time": "2023-01-07T12:50:16.411612Z", "start_time": "2022-12-07T12:50:17.411612Z"}, + }, + "cursor": {"timestamp": "2024-02-19T12:50:18+0000"}, + } + ] + }, + 0, + ], + [{"start_date": "2020-02-16T17:30:00.000Z"}, {}, 2], + [{}, {}, 2], + [ + {}, + { + "states": [ + { + "partition": { + "id": "7847cdaeff", + "parent_slice": {"end_time": "2023-01-07T12:50:16.411612Z", "start_time": "2022-12-07T12:50:17.411612Z"}, + }, + "cursor": {"timestamp": "2021-02-19T12:50:18+0000"}, + } + ] + }, + 1, + ], + ], + ids=[ + "start_date_and_stream_state", + "start_date_and_NO_stream_state", + "NO_start_date_and_NO_stream_state", + "NO_start_date_and_stream_state", + ], +) +def test_mailchimp_custom_filter(config: dict, stream_state: dict, len_expected_records: int): + stream_slice = StreamSlice( + partition={"id": "7847cdaeff"}, cursor_slice={"end_time": "2024-02-19T13:33:56+0000", "start_time": "2022-10-07T13:33:56+0000"} + ) + parameters = { + "name": "segment_members", + "cursor_field": "timestamp", + } + record_filter = MailChimpRecordFilter(config=config, condition="", parameters=parameters) + + records = [ + { + "id": "1dd067951f91190b65b43305b9166bc7", + "timestamp": "2020-12-27T08:34:39+00:00", + "campaign_id": "7847cdaeff", + "segment_id": 13506120, + }, + { + "id": "1dd067951f91190b65b43305b9166bc7", + "timestamp": "2022-12-27T08:34:39+00:00", + "campaign_id": "7847cdaeff", + "segment_id": 13506120, + }, + ] + + actual_records = record_filter.filter_records(records, stream_state=stream_state, stream_slice=stream_slice) + assert len(actual_records) == len_expected_records diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_config_datacenter_migration.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_config_datacenter_migration.py new file mode 100644 index 0000000000000..20fe8312352e2 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_config_datacenter_migration.py @@ -0,0 +1,37 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import json +import os +from typing import Any, Mapping + +import pytest +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from source_mailchimp import SourceMailchimp +from source_mailchimp.config_migrations import MigrateDataCenter + +# BASE ARGS +SOURCE: YamlDeclarativeSource = SourceMailchimp() + + +# HELPERS +def load_config(config_path: str) -> Mapping[str, Any]: + with open(config_path, "r") as config: + return json.load(config) + + +@pytest.mark.parametrize( + "config_path", + [ + (f"{os.path.dirname(__file__)}/test_configs/test_config_api_key.json"), + (f"{os.path.dirname(__file__)}/test_configs/test_config_oauth.json"), + ], + ids=["test_requester_datacenter_with_api_key", "test_requester_datacenter_with_oauth_flow"], +) +def test_mailchimp_config_migration(config_path: str, requests_mock): + requests_mock.get("https://login.mailchimp.com/oauth2/metadata", json={"dc": "us10"}) + + migration_instance = MigrateDataCenter + migration_instance.migrate(["check", "--config", config_path], SOURCE) + test_migrated_config = load_config(config_path) + assert test_migrated_config.get("data_center") == "us10" diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_configs/test_config_api_key.json b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_configs/test_config_api_key.json new file mode 100644 index 0000000000000..20866d9bfcf7b --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_configs/test_config_api_key.json @@ -0,0 +1,4 @@ +{ + "credentials": { "auth_type": "apikey", "apikey": "random_api_key-us10" }, + "data_center": "us10" +} diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_configs/test_config_oauth.json b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_configs/test_config_oauth.json new file mode 100644 index 0000000000000..ef7ef97ee2413 --- /dev/null +++ b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_configs/test_config_oauth.json @@ -0,0 +1,8 @@ +{ + "credentials": { + "auth_type": "oauth2.0", + "client_id": "client_id", + "client_secret": "client_secret", + "access_token": "access_token" + } +} diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_source.py deleted file mode 100644 index b1ccfcddac6ad..0000000000000 --- a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_source.py +++ /dev/null @@ -1,116 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import logging - -import pytest -from source_mailchimp.source import MailChimpAuthenticator, SourceMailchimp - -logger = logging.getLogger("airbyte") - - -def test_check_connection_ok(requests_mock, config, data_center): - responses = [ - {"json": {"health_status": "Everything's Chimpy!"}}, - ] - requests_mock.register_uri("GET", f"https://{data_center}.api.mailchimp.com/3.0/ping", responses) - ok, error_msg = SourceMailchimp().check_connection(logger, config=config) - - assert ok - assert not error_msg - - -@pytest.mark.parametrize( - "response, expected_message", - [ - ( - { - "json": { - "title": "API Key Invalid", - "details": "Your API key may be invalid, or you've attempted to access the wrong datacenter.", - } - }, - "Encountered an error while connecting to Mailchimp. Type: API Key Invalid. Details: Your API key may be invalid, or you've attempted to access the wrong datacenter.", - ), - ( - {"json": {"title": "Forbidden", "details": "You don't have permission to access this resource."}}, - "Encountered an error while connecting to Mailchimp. Type: Forbidden. Details: You don't have permission to access this resource.", - ), - ( - {"json": {}}, - "Encountered an error while connecting to Mailchimp. Type: Unknown Error. Details: An unknown error occurred. Please verify your credentials and try again.", - ), - ], - ids=["API Key Invalid", "Forbidden", "Unknown Error"], -) -def test_check_connection_error(requests_mock, config, data_center, response, expected_message): - requests_mock.register_uri("GET", f"https://{data_center}.api.mailchimp.com/3.0/ping", json=response["json"]) - ok, error_msg = SourceMailchimp().check_connection(logger, config=config) - - assert not ok - assert error_msg == expected_message - - -def test_get_oauth_data_center_ok(requests_mock, access_token, data_center): - responses = [ - {"json": {"dc": data_center}, "status_code": 200}, - ] - requests_mock.register_uri("GET", "https://login.mailchimp.com/oauth2/metadata", responses) - assert MailChimpAuthenticator().get_oauth_data_center(access_token) == data_center - - -def test_get_oauth_data_center_exception(requests_mock, access_token): - responses = [ - {"json": {}, "status_code": 200}, - {"json": {"error": "invalid_token"}, "status_code": 200}, - {"status_code": 403}, - ] - requests_mock.register_uri("GET", "https://login.mailchimp.com/oauth2/metadata", responses) - with pytest.raises(Exception): - MailChimpAuthenticator().get_oauth_data_center(access_token) - - -def test_oauth_config(requests_mock, oauth_config, data_center): - responses = [ - {"json": {"dc": data_center}, "status_code": 200}, - ] - requests_mock.register_uri("GET", "https://login.mailchimp.com/oauth2/metadata", responses) - assert MailChimpAuthenticator().get_auth(oauth_config) - - -def test_apikey_config(apikey_config): - assert MailChimpAuthenticator().get_auth(apikey_config) - - -def test_wrong_config(wrong_config): - with pytest.raises(Exception): - MailChimpAuthenticator().get_auth(wrong_config) - - -@pytest.mark.parametrize( - "config, expected_return", - [ - ({}, None), - ({"start_date": "2021-01-01T00:00:00.000Z"}, None), - ({"start_date": "2021-99-99T79:89:99.123Z"}, "The provided start date is not a valid date. Please check the date you input and try again."), - ({"start_date": "2021-01-01T00:00:00.000"}, "Please check the format of the start date against the pattern descriptor."), - ({"start_date": "2025-01-25T00:00:00.000Z"}, "The start date cannot be greater than the current date."), - ], - ids=[ - "No start date", - "Valid start date", - "Invalid start date", - "Invalid format", - "Future start date", - ] -) -def test_validate_start_date(config, expected_return): - source = SourceMailchimp() - result = source._validate_start_date(config) - assert result == expected_return - - -def test_streams_count(config): - streams = SourceMailchimp().streams(config) - assert len(streams) == 12 diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_streams.py deleted file mode 100644 index b441fe26f7b3e..0000000000000 --- a/airbyte-integrations/connectors/source-mailchimp/unit_tests/test_streams.py +++ /dev/null @@ -1,696 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import logging -from unittest.mock import MagicMock - -import pytest -import requests -import responses -from airbyte_cdk.models import SyncMode -from requests.exceptions import HTTPError -from source_mailchimp.streams import ( - Automations, - Campaigns, - EmailActivity, - InterestCategories, - Interests, - ListMembers, - Lists, - Reports, - SegmentMembers, - Segments, - Tags, - Unsubscribes, -) -from utils import read_full_refresh, read_incremental - - -@pytest.mark.parametrize( - "stream, endpoint", - [ - (Lists, "lists"), - (Campaigns, "campaigns"), - (Segments, "lists/123/segments"), - ], -) -def test_stream_read(requests_mock, auth, stream, endpoint): - args = {"authenticator": auth} - stream = stream(**args) - stream_responses = [ - { - "json": { - stream.data_field: [{"id": "test_id"}], - } - } - ] - stream_url = stream.url_base + endpoint - requests_mock.register_uri("GET", stream_url, stream_responses) - - # Mock the 'lists' endpoint as Segments stream_slice - lists_url = stream.url_base + "lists" - lists_response = {"json": {"lists": [{"id": "123"}]}} - requests_mock.register_uri("GET", lists_url, [lists_response]) - records = read_full_refresh(stream) - - assert records - - -def test_next_page_token(auth): - args = {"authenticator": auth} - stream = Lists(**args) - inputs = {"response": MagicMock()} - expected_token = None - assert stream.next_page_token(**inputs) == expected_token - - resp = {"lists": [{"id": i} for i in range(1001)]} - inputs = {"response": MagicMock(json=MagicMock(return_value=resp))} - expected_token = {"offset": 1000} - assert stream.next_page_token(**inputs) == expected_token - - -@pytest.mark.parametrize( - "stream, inputs, expected_params", - [ - ( - Lists, - {"stream_slice": None, "stream_state": None, "next_page_token": None}, - {"count": 1000, "sort_dir": "ASC", "sort_field": "date_created", "exclude_fields": "lists._links"}, - ), - ( - Lists, - {"stream_slice": None, "stream_state": None, "next_page_token": {"offset": 1000}}, - {"count": 1000, "sort_dir": "ASC", "sort_field": "date_created", "offset": 1000, "exclude_fields": "lists._links"}, - ), - ( - InterestCategories, - {"stream_slice": {"parent": {"id": "123"}}, "stream_state": None, "next_page_token": None}, - {"count": 1000, "exclude_fields": "categories._links"}, - ), - ( - Interests, - {"stream_slice": {"parent": {"id": "123"}}, "stream_state": None, "next_page_token": {"offset": 2000}}, - {"count": 1000, "exclude_fields": "interests._links", "offset": 2000}, - ), - ], - ids=[ - "Lists: no next_page_token or state to add to request params", - "Lists: next_page_token added to request params", - "InterestCategories: no next_page_token to add to request params", - "Interests: next_page_token added to request params", - ], -) -def test_request_params(auth, stream, inputs, expected_params): - args = {"authenticator": auth} - if stream == InterestCategories: - args["parent"] = Lists(**args) - elif stream == Interests: - args["parent"] = InterestCategories(authenticator=auth, parent=Lists(authenticator=auth)) - stream = stream(**args) - assert stream.request_params(**inputs) == expected_params - - -@pytest.mark.parametrize( - "current_state_stream, latest_record, expected_state", - [ - ({}, {"date_created": "2020-01-01"}, {"date_created": "2020-01-01"}), - ({"date_created": "2020-01-01"}, {"date_created": "2021-01-01"}, {"date_created": "2021-01-01"}), - ({"date_created": "2021-01-01"}, {"date_created": "2022-01-01"}, {"date_created": "2022-01-01"}), - ], -) -def test_get_updated_state(auth, current_state_stream, latest_record, expected_state): - args = {"authenticator": auth} - stream = Lists(**args) - - new_stream_state = stream.get_updated_state(current_state_stream, latest_record) - assert new_stream_state == expected_state - - -@responses.activate -def test_stream_teams_read(auth): - args = {"authenticator": auth} - stream = EmailActivity(**args) - stream_url = stream.url_base + "reports/123/email-activity" - campaigns_stream_url = stream.url_base + "campaigns" - responses.add("GET", campaigns_stream_url, json={"campaigns": [{"id": 123}]}) - - response = {"emails": [{"campaign_id": 123, "activity": [{"action": "q", "timestamp": "2021-08-24T14:15:22Z"}]}]} - responses.add("GET", stream_url, json=response) - records = read_incremental(stream, {}) - - assert records - assert records == [{"campaign_id": 123, "action": "q", "timestamp": "2021-08-24T14:15:22Z"}] - assert len(responses.calls) == 2 - - -@responses.activate -def test_stream_parse_json_error(auth, caplog): - args = {"authenticator": auth} - stream = EmailActivity(**args) - stream_url = stream.url_base + "reports/123/email-activity" - campaigns_stream_url = stream.url_base + "campaigns" - responses.add("GET", campaigns_stream_url, json={"campaigns": [{"id": 123}]}) - responses.add("GET", stream_url, body="not_valid_json") - read_incremental(stream, {}) - assert "response.content=b'not_valid_json'" in caplog.text - - -@pytest.mark.parametrize( - "stream_class, stream_slice, stream_state, next_page_token, expected_params", - [ - # Test case 1: no state, no next_page_token - ( - Segments, - {"list_id": "123"}, - {}, - None, - {"count": 1000, "sort_dir": "ASC", "sort_field": "updated_at", "list_id": "123", "exclude_fields": "segments._links"}, - ), - # Test case 2: state and next_page_token - ( - ListMembers, - {"list_id": "123", "since_last_changed": "2023-10-15T00:00:00Z"}, - {"123": {"last_changed": "2023-10-15T00:00:00Z"}}, - {"offset": 1000}, - { - "count": 1000, - "sort_dir": "ASC", - "sort_field": "last_changed", - "list_id": "123", - "offset": 1000, - "exclude_fields": "members._links", - "since_last_changed": "2023-10-15T00:00:00Z", - }, - ), - ], - ids=[ - "Segments: no next_page_token or state to add to request params", - "ListMembers: next_page_token and state filter added to request params", - ], -) -def test_list_child_request_params(auth, stream_class, stream_slice, stream_state, next_page_token, expected_params): - """ - Tests the request_params method for the shared MailChimpListSubStream class. - """ - stream = stream_class(authenticator=auth) - params = stream.request_params(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token) - assert params == expected_params - - -@pytest.mark.parametrize( - "stream_class, current_stream_state,latest_record,expected_state", - [ - # Test case 1: current_stream_state is empty - (Segments, {}, {"list_id": "list_1", "updated_at": "2023-10-15T00:00:00Z"}, {"list_1": {"updated_at": "2023-10-15T00:00:00Z"}}), - # Test case 2: latest_record's cursor is higher than current_stream_state for list_1 and updates it - ( - Segments, - {"list_1": {"updated_at": "2023-10-14T00:00:00Z"}, "list_2": {"updated_at": "2023-10-15T00:00:00Z"}}, - {"list_id": "list_1", "updated_at": "2023-10-15T00:00:00Z"}, - {"list_1": {"updated_at": "2023-10-15T00:00:00Z"}, "list_2": {"updated_at": "2023-10-15T00:00:00Z"}}, - ), - # Test case 3: latest_record's cursor is lower than current_stream_state for list_2, no state update - ( - ListMembers, - {"list_1": {"last_changed": "2023-10-15T00:00:00Z"}, "list_2": {"last_changed": "2023-10-15T00:00:00Z"}}, - {"list_id": "list_2", "last_changed": "2023-10-14T00:00:00Z"}, - {"list_1": {"last_changed": "2023-10-15T00:00:00Z"}, "list_2": {"last_changed": "2023-10-15T00:00:00Z"}}, - ), - ( - SegmentMembers, - {"segment_1": {"last_changed": "2023-10-15T00:00:00Z"}, "segment_2": {"last_changed": "2023-10-15T00:00:00Z"}}, - {"segment_id": "segment_1", "last_changed": "2023-10-16T00:00:00Z"}, - {"segment_1": {"last_changed": "2023-10-16T00:00:00Z"}, "segment_2": {"last_changed": "2023-10-15T00:00:00Z"}}, - ), - ( - SegmentMembers, - {"segment_1": {"last_changed": "2023-10-15T00:00:00Z"}}, - {"segment_id": "segment_2", "last_changed": "2023-10-16T00:00:00Z"}, - {"segment_1": {"last_changed": "2023-10-15T00:00:00Z"}, "segment_2": {"last_changed": "2023-10-16T00:00:00Z"}}, - ) - ], - ids=[ - "Segments: no current_stream_state", - "Segments: latest_record's cursor > than current_stream_state for list_1", - "ListMembers: latest_record's cursor < current_stream_state for list_2", - "SegmentMembers: latest_record's cursor > current_stream_state for segment_1", - "SegmentMembers: no stream_state for current slice, new slice added to state" - ], -) -def test_list_child_get_updated_state(auth, stream_class, current_stream_state, latest_record, expected_state): - """ - Tests that the get_updated_state method for the shared MailChimpListSubStream class - correctly updates state only for its slice. - """ - segments_stream = stream_class(authenticator=auth) - updated_state = segments_stream.get_updated_state(current_stream_state, latest_record) - assert updated_state == expected_state - - -@pytest.mark.parametrize( - "stream_state, records, expected", - [ - # Test case 1: No stream state, all records should be yielded - ( - {}, - {"members": [ - {"id": 1, "segment_id": "segment_1", "last_changed": "2021-01-01T00:00:00Z"}, - {"id": 2, "segment_id": "segment_1", "last_changed": "2021-01-02T00:00:00Z"} - ]}, - [ - {"id": 1, "segment_id": "segment_1", "last_changed": "2021-01-01T00:00:00Z"}, - {"id": 2, "segment_id": "segment_1", "last_changed": "2021-01-02T00:00:00Z"} - ] - ), - - # Test case 2: Records older than stream state should be filtered out - ( - {"segment_1": {"last_changed": "2021-02-01T00:00:00Z"}}, - {"members": [ - {"id": 1, "segment_id": "segment_1", "last_changed": "2021-01-01T00:00:00Z"}, - {"id": 2, "segment_id": "segment_1", "last_changed": "2021-03-01T00:00:00Z"} - ]}, - [{"id": 2, "segment_id": "segment_1", "last_changed": "2021-03-01T00:00:00Z"}] - ), - - # Test case 3: Two lists in stream state, only state for segment_id_1 determines filtering - ( - {"segment_1": {"last_changed": "2021-01-02T00:00:00Z"}, "segment_2": {"last_changed": "2022-01-01T00:00:00Z"}}, - {"members": [ - {"id": 1, "segment_id": "segment_1", "last_changed": "2021-01-01T00:00:00Z"}, - {"id": 2, "segment_id": "segment_1", "last_changed": "2021-03-01T00:00:00Z"} - ]}, - [{"id": 2, "segment_id": "segment_1", "last_changed": "2021-03-01T00:00:00Z"}] - ), - ], - ids=[ - "No stream state, all records should be yielded", - "Record < stream state, should be filtered out", - "Record >= stream state, should be yielded", - ] -) -def test_segment_members_parse_response(auth, stream_state, records, expected): - segment_members_stream = SegmentMembers(authenticator=auth) - response = MagicMock() - response.json.return_value = records - parsed_records = list(segment_members_stream.parse_response(response, stream_state, stream_slice={"segment_id": "segment_1"})) - assert parsed_records == expected, f"Expected: {expected}, Actual: {parsed_records}" - - -@pytest.mark.parametrize( - "stream, record, expected_record", - [ - ( - SegmentMembers, - {"id": 1, "email_address": "a@gmail.com", "email_type": "html", "opt_timestamp": ""}, - {"id": 1, "email_address": "a@gmail.com", "email_type": "html", "opt_timestamp": None} - ), - ( - SegmentMembers, - {"id": 1, "email_address": "a@gmail.com", "email_type": "html", "opt_timestamp": "2022-01-01T00:00:00.000Z", "merge_fields": {"FNAME": "Bob", "LNAME": "", "ADDRESS": "", "PHONE": ""}}, - {"id": 1, "email_address": "a@gmail.com", "email_type": "html", "opt_timestamp": "2022-01-01T00:00:00.000Z", "merge_fields": {"FNAME": "Bob", "LNAME": None, "ADDRESS": None, "PHONE": None}} - ), - ( - Campaigns, - {"id": "1", "web_id": 2, "email_type": "html", "create_time": "2022-01-01T00:00:00.000Z", "send_time": ""}, - {"id": "1", "web_id": 2, "email_type": "html", "create_time": "2022-01-01T00:00:00.000Z", "send_time": None} - ), - ( - Reports, - {"id": "1", "type": "rss", "clicks": {"clicks_total": 1, "last_click": "2022-01-01T00:00:00Z"}, "opens": {"opens_total": 0, "last_open": ""}}, - {"id": "1", "type": "rss", "clicks": {"clicks_total": 1, "last_click": "2022-01-01T00:00:00Z"}, "opens": {"opens_total": 0, "last_open": None}} - ), - ( - Lists, - {"id": "1", "name": "Santa's List", "stats": {"last_sub_date": "2022-01-01T00:00:00Z", "last_unsub_date": ""}}, - {"id": "1", "name": "Santa's List", "stats": {"last_sub_date": "2022-01-01T00:00:00Z", "last_unsub_date": None}} - ) - ], - ids=[ - "segment_members: opt_timestamp nullified", - "segment_members: nested merge_fields nullified", - "campaigns: send_time nullified", - "reports: nested opens.last_open nullified", - "lists: stats.last_unsub_date nullified" - ] -) -def test_filter_empty_fields(auth, stream, record, expected_record): - """ - Tests that empty string values are converted to None. - """ - stream = stream(authenticator=auth) - assert stream.filter_empty_fields(record) == expected_record - - -def test_unsubscribes_stream_slices(requests_mock, unsubscribes_stream, campaigns_stream, mock_campaigns_response): - campaigns_url = campaigns_stream.url_base + campaigns_stream.path() - requests_mock.register_uri("GET", campaigns_url, json={"campaigns": mock_campaigns_response}) - - expected_slices = [{"campaign_id": "campaign_1"}, {"campaign_id": "campaign_2"}, {"campaign_id": "campaign_3"}] - slices = list(unsubscribes_stream.stream_slices(sync_mode=SyncMode.incremental)) - assert slices == expected_slices - - -@pytest.mark.parametrize( - "stream_state, expected_records", - [ - ( # Test case 1: all records >= state - {"campaign_1": {"timestamp": "2022-01-01T00:00:00Z"}}, - [ - {"campaign_id": "campaign_1", "email_id": "email_1", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_2", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_3", "timestamp": "2022-01-01T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_4", "timestamp": "2022-01-03T00:00:00Z"}, - ], - ), - ( # Test case 2: one record < state - {"campaign_1": {"timestamp": "2022-01-02T00:00:00Z"}}, - [ - {"campaign_id": "campaign_1", "email_id": "email_1", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_2", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_4", "timestamp": "2022-01-03T00:00:00Z"}, - ], - ), - ( # Test case 3: one record >= state - {"campaign_1": {"timestamp": "2022-01-03T00:00:00Z"}}, - [ - {"campaign_id": "campaign_1", "email_id": "email_4", "timestamp": "2022-01-03T00:00:00Z"}, - ], - ), - ( # Test case 4: no state, all records returned - {}, - [ - {"campaign_id": "campaign_1", "email_id": "email_1", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_2", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_3", "timestamp": "2022-01-01T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_4", "timestamp": "2022-01-03T00:00:00Z"}, - ], - ), - ], - ids=[ - "all records >= state", - "one record < state", - "one record >= state", - "no state, all records returned", - ], -) -def test_parse_response(stream_state, expected_records, unsubscribes_stream): - mock_response = MagicMock(spec=requests.Response) - mock_response.json.return_value = { - "unsubscribes": [ - {"campaign_id": "campaign_1", "email_id": "email_1", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_2", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_3", "timestamp": "2022-01-01T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_4", "timestamp": "2022-01-03T00:00:00Z"}, - ] - } - stream_slice = {"campaign_id": "campaign_1"} - records = list(unsubscribes_stream.parse_response(response=mock_response, stream_slice=stream_slice, stream_state=stream_state)) - assert records == expected_records - - -@pytest.mark.parametrize( - "latest_record, expected_updated_state", - [ - # Test case 1: latest_record > and updates the state of campaign_1 - ( - { - "email_id": "email_1", - "email_address": "address1@email.io", - "reason": "None given", - "timestamp": "2022-01-05T00:00:00Z", - "campaign_id": "campaign_1", - }, - { - "campaign_1": {"timestamp": "2022-01-05T00:00:00Z"}, - "campaign_2": {"timestamp": "2022-01-02T00:00:00Z"}, - "campaign_3": {"timestamp": "2022-01-03T00:00:00Z"}, - }, - ), - # Test case 2: latest_record > and updates the state of campaign_2 - ( - { - "email_id": "email_2", - "email_address": "address2@email.io", - "reason": "Inappropriate content", - "timestamp": "2022-01-05T00:00:00Z", - "campaign_id": "campaign_2", - }, - { - "campaign_1": {"timestamp": "2022-01-01T00:00:00Z"}, - "campaign_2": {"timestamp": "2022-01-05T00:00:00Z"}, - "campaign_3": {"timestamp": "2022-01-03T00:00:00Z"}, - }, - ), - # Test case 3: latest_record < and does not update the state of campaign_3 - ( - { - "email_id": "email_3", - "email_address": "address3@email.io", - "reason": "No longer interested", - "timestamp": "2021-01-01T00:00:00Z", - "campaign_id": "campaign_3", - }, - { - "campaign_1": {"timestamp": "2022-01-01T00:00:00Z"}, - "campaign_2": {"timestamp": "2022-01-02T00:00:00Z"}, - "campaign_3": {"timestamp": "2022-01-03T00:00:00Z"}, - }, - ), - # Test case 4: latest_record sets state campaign_4 - ( - { - "email_id": "email_4", - "email_address": "address4@email.io", - "reason": "No longer interested", - "timestamp": "2022-01-04T00:00:00Z", - "campaign_id": "campaign_4", - }, - { - "campaign_1": {"timestamp": "2022-01-01T00:00:00Z"}, - "campaign_2": {"timestamp": "2022-01-02T00:00:00Z"}, - "campaign_3": {"timestamp": "2022-01-03T00:00:00Z"}, - "campaign_4": {"timestamp": "2022-01-04T00:00:00Z"}, - }, - ), - ], - ids=[ - "latest_record > and updates the state of campaign_1", - "latest_record > and updates the state of campaign_2", - "latest_record < and does not update the state of campaign_3", - "latest_record sets state of campaign_4", - ], -) -def test_unsubscribes_get_updated_state(unsubscribes_stream, mock_unsubscribes_state, latest_record, expected_updated_state): - updated_state = unsubscribes_stream.get_updated_state(mock_unsubscribes_state, latest_record) - assert updated_state == expected_updated_state - - -@pytest.mark.parametrize( - "stream,url,status_code,response_content,expected_availability,expected_reason_substring", - [ - ( - Campaigns, - "https://some_dc.api.mailchimp.com/3.0/campaigns", - 403, - b'{"object": "error", "status": 403, "code": "restricted_resource"}', - False, - "Unable to read campaigns stream", - ), - ( - EmailActivity, - "https://some_dc.api.mailchimp.com/3.0/reports/123/email-activity", - 403, - b'{"object": "error", "status": 403, "code": "restricted_resource"}', - False, - "Unable to read email_activity stream", - ), - ( - Lists, - "https://some_dc.api.mailchimp.com/3.0/lists", - 200, - b'{ "lists": [{"id": "123", "date_created": "2022-01-01T00:00:00+000"}]}', - True, - None, - ), - ( - Lists, - "https://some_dc.api.mailchimp.com/3.0/lists", - 400, - b'{ "object": "error", "status": 404, "code": "invalid_action"}', - False, - None, - ), - ], - ids=[ - "Campaigns 403 error", - "EmailActivity 403 error", - "Lists 200 success", - "Lists 400 error", - ], -) -def test_403_error_handling( - auth, requests_mock, stream, url, status_code, response_content, expected_availability, expected_reason_substring -): - """ - Test that availability strategy flags streams with 403 error as unavailable - and returns appropriate message. - """ - - requests_mock.get(url=url, status_code=status_code, content=response_content) - - stream = stream(authenticator=auth) - - if stream.__class__.__name__ == "EmailActivity": - stream.stream_slices = MagicMock(return_value=[{"campaign_id": "123"}]) - - try: - is_available, reason = stream.check_availability(logger=logging.Logger, source=MagicMock()) - - assert is_available is expected_availability - - if expected_reason_substring: - assert expected_reason_substring in reason - else: - assert reason is None - - # Handle non-403 error - except HTTPError as e: - assert e.response.status_code == status_code - - -@pytest.mark.parametrize( - "stream, stream_slice, expected_endpoint", - [ - (Automations, {}, "automations"), - (Lists, {}, "lists"), - (Campaigns, {}, "campaigns"), - (EmailActivity, {"campaign_id": "123"}, "reports/123/email-activity"), - (InterestCategories, {"parent": {"id": "123"}}, "lists/123/interest-categories"), - (Interests, {"parent": {"list_id": "123", "id": "456"}}, "lists/123/interest-categories/456/interests"), - (ListMembers, {"list_id": "123"}, "lists/123/members"), - (Reports, {}, "reports"), - (SegmentMembers, {"list_id": "123", "segment_id": "456"}, "lists/123/segments/456/members"), - (Segments, {"list_id": "123"}, "lists/123/segments"), - (Tags, {"parent": {"id": "123"}}, "lists/123/tag-search"), - (Unsubscribes, {"campaign_id": "123"}, "reports/123/unsubscribed"), - ], - ids=[ - "Automations", - "Lists", - "Campaigns", - "EmailActivity", - "InterestCategories", - "Interests", - "ListMembers", - "Reports", - "SegmentMembers", - "Segments", - "Tags", - "Unsubscribes", - ], -) -def test_path(auth, stream, stream_slice, expected_endpoint): - """ - Test the path method for each stream. - """ - - # Add parent stream where necessary - if stream is InterestCategories or stream is Tags: - stream = stream(authenticator=auth, parent=Lists(authenticator=auth)) - elif stream is Interests: - stream = stream(authenticator=auth, parent=InterestCategories(authenticator=auth, parent=Lists(authenticator=auth))) - else: - stream = stream(authenticator=auth) - - endpoint = stream.path(stream_slice=stream_slice) - - assert endpoint == expected_endpoint, f"Stream {stream}: expected path '{expected_endpoint}', got '{endpoint}'" - - -@pytest.mark.parametrize( - "start_date, state_date, expected_return_value", - [ - ( - "2021-01-01T00:00:00.000Z", - "2020-01-01T00:00:00+00:00", - "2021-01-01T00:00:00Z" - ), - ( - "2021-01-01T00:00:00.000Z", - "2023-10-05T00:00:00+00:00", - "2023-10-05T00:00:00+00:00" - ), - ( - None, - "2022-01-01T00:00:00+00:00", - "2022-01-01T00:00:00+00:00" - ), - ( - "2020-01-01T00:00:00.000Z", - None, - "2020-01-01T00:00:00Z" - ), - ( - None, - None, - None - ) - ] -) -def test_get_filter_date(auth, start_date, state_date, expected_return_value): - """ - Tests that the get_filter_date method returns the correct date string - """ - stream = Campaigns(authenticator=auth, start_date=start_date) - result = stream.get_filter_date(start_date, state_date) - assert result == expected_return_value, f"Expected: {expected_return_value}, Actual: {result}" - - -@pytest.mark.parametrize( - "stream_class, records, filter_date, expected_return_value", - [ - ( - Unsubscribes, - [ - {"campaign_id": "campaign_1", "email_id": "email_1", "timestamp": "2022-01-02T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_2", "timestamp": "2022-01-04T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_3", "timestamp": "2022-01-03T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_4", "timestamp": "2022-01-01T00:00:00Z"}, - ], - "2022-01-02T12:00:00+00:00", - [ - {"campaign_id": "campaign_1", "email_id": "email_2", "timestamp": "2022-01-04T00:00:00Z"}, - {"campaign_id": "campaign_1", "email_id": "email_3", "timestamp": "2022-01-03T00:00:00Z"}, - ], - ), - ( - SegmentMembers, - [ - {"id": 1, "segment_id": "segment_1", "last_changed": "2021-01-04T00:00:00Z"}, - {"id": 2, "segment_id": "segment_1", "last_changed": "2021-01-01T00:00:00Z"}, - {"id": 3, "segment_id": "segment_1", "last_changed": "2021-01-03T00:00:00Z"}, - {"id": 4, "segment_id": "segment_1", "last_changed": "2021-01-02T00:00:00Z"}, - ], - None, - [ - {"id": 1, "segment_id": "segment_1", "last_changed": "2021-01-04T00:00:00Z"}, - {"id": 2, "segment_id": "segment_1", "last_changed": "2021-01-01T00:00:00Z"}, - {"id": 3, "segment_id": "segment_1", "last_changed": "2021-01-03T00:00:00Z"}, - {"id": 4, "segment_id": "segment_1", "last_changed": "2021-01-02T00:00:00Z"}, - ], - ) - ], - ids=[ - "Unsubscribes: filter_date is set, records filtered", - "SegmentMembers: filter_date is None, all records returned" - ] -) -def test_filter_old_records(auth, stream_class, records, filter_date, expected_return_value): - """ - Tests the logic for filtering old records in streams that do not support query_param filtering. - """ - stream = stream_class(authenticator=auth) - filtered_records = list(stream.filter_old_records(records, filter_date)) - assert filtered_records == expected_return_value diff --git a/airbyte-integrations/connectors/source-mailchimp/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-mailchimp/unit_tests/unit_test.py deleted file mode 100644 index 0371987d8762a..0000000000000 --- a/airbyte-integrations/connectors/source-mailchimp/unit_tests/unit_test.py +++ /dev/null @@ -1,13 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - - -from airbyte_cdk.logger import AirbyteLogger -from source_mailchimp import SourceMailchimp - - -def test_client_wrong_credentials(): - source = SourceMailchimp() - status, error = source.check_connection(logger=AirbyteLogger, config={"username": "Jonny", "apikey": "blah-blah"}) - assert not status diff --git a/docs/integrations/sources/mailchimp-migrations.md b/docs/integrations/sources/mailchimp-migrations.md index c236c549eef2f..d09683b8e643b 100644 --- a/docs/integrations/sources/mailchimp-migrations.md +++ b/docs/integrations/sources/mailchimp-migrations.md @@ -1,5 +1,34 @@ # Mailchimp Migration Guide +## Upgrading to 2.0.0 + +Version 2.0.0 introduces changes in primary key for streams `Segment Members` and `List Members`. + +## Migration Steps + +### Refresh affected schemas and reset data + +1. Select **Connections** in the main nav bar. + 1. Select the connection(s) affected by the update. +2. Select the **Replication** tab. + 1. Select **Refresh source schema**. + 2. Select **OK**. +:::note +Any detected schema changes will be listed for your review. +::: +3. Select **Save changes** at the bottom of the page. + 1. Ensure the **Reset affected streams** option is checked. +:::note +Depending on destination type you may not be prompted to reset your data. +::: +4. Select **Save connection**. +:::note +This will reset the data in your destination and initiate a fresh sync. +::: + +For more information on resetting your data in Airbyte, see [this page](https://docs.airbyte.com/operator-guides/reset). + + ## Upgrading to 1.0.0 Version 1.0.0 of the Source Mailchimp connector introduces a number of breaking changes to the schemas of all incremental streams. A full schema refresh and data reset are required when upgrading to this version. diff --git a/docs/integrations/sources/mailchimp.md b/docs/integrations/sources/mailchimp.md index 6db34dce863c0..c45fd006c7a0b 100644 --- a/docs/integrations/sources/mailchimp.md +++ b/docs/integrations/sources/mailchimp.md @@ -122,7 +122,8 @@ Now that you have set up the Mailchimp source connector, check out the following ## Changelog | Version | Date | Pull Request | Subject | -| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------- | +|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------| +| 2.0.0 | 2024-04-01 | [35281](https://github.com/airbytehq/airbyte/pull/35281) | Migrate to Low-Code | | 1.2.0 | 2024-03-28 | [36600](https://github.com/airbytehq/airbyte/pull/36600) | Migrate to latest Airbyte-CDK. | | 1.1.2 | 2024-02-09 | [35092](https://github.com/airbytehq/airbyte/pull/35092) | Manage dependencies with Poetry. | | 1.1.1 | 2024-01-11 | [34157](https://github.com/airbytehq/airbyte/pull/34157) | Prepare for airbyte-lib |