Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Mailchimp: Add optional start_date to config #32852

Merged
merged 42 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
47a6a5a
add start date to config
ChristoGrab Nov 16, 2023
b611729
Source Hubspot: fix expected records (#32645)
roman-yermilov-gl Nov 20, 2023
a9cc2b9
Source Github: fix expected records (#32644)
roman-yermilov-gl Nov 20, 2023
e74bbde
✨ Source Mailchimp: Add Interests, InterestCategories, Tags streams (…
ChristoGrab Nov 20, 2023
918e64c
🐛 Source Pinterest: Fix backoff waiting time (#32672)
tolik0 Nov 21, 2023
0f28144
🚨🚨🐛 Source Pinterest: Update date-time fields with airbyte_type: time…
tolik0 Nov 21, 2023
39ac79c
remove bq+snowflake from legacy normalization docs (#32708)
edgao Nov 21, 2023
667be92
🐛 Source Pinterest: Fix Advertiser stream names (#32734)
tolik0 Nov 23, 2023
059ca2f
✨ Source Pinterest: Update docs and spec; add missing `placement_traf…
tolik0 Nov 23, 2023
d81ae88
🐛 Source Facebook Marketing: Removed validation that blocked personal…
tolik0 Nov 23, 2023
74d359d
Docs: Add permissions to prereqs in Source Facebook Marketing (#32653)
ChristoGrab Nov 24, 2023
3e69945
✨ Source Mailchimp: Implement SegmentMembers stream (#32782)
ChristoGrab Nov 27, 2023
4cdfc7a
Source My Hours: Update CDK (#32680)
ChristoGrab Nov 27, 2023
428e1b3
add logic for use of start_date in incremental streams
ChristoGrab Nov 27, 2023
b5d0ede
Source Cart: Update CDK to Latest Version (#32705)
pnilan Nov 27, 2023
86848a6
Source Twilio: Increase test coverage, fix parse_response bug, update…
pnilan Nov 27, 2023
1fda1df
Source Sendgrid: Increase Test Coverage, Update Expected Records (#32…
pnilan Nov 27, 2023
941d937
fix method and add unit test
ChristoGrab Nov 27, 2023
d17e251
version bump
ChristoGrab Nov 27, 2023
b2a546e
small edits
ChristoGrab Nov 27, 2023
1a0bb33
Automated Commit - Formatting Changes
ChristoGrab Nov 27, 2023
a660279
✨Source Amazon Seller Partner: multiple updates (#32833)
Nov 28, 2023
00a896b
add note to docs
ChristoGrab Nov 28, 2023
05c5acb
Merge branch 'christo/mailchimp-start-date' of https://github.com/air…
ChristoGrab Nov 28, 2023
88b24cf
Merge remote-tracking branch 'origin/master' into dev
git-phu Nov 28, 2023
604a2ab
Merge branch 'dev' of https://github.com/airbytehq/airbyte into chris…
ChristoGrab Nov 28, 2023
c28ca4d
Merge from master
ChristoGrab Nov 28, 2023
04a9d68
chore: fix duplicated docs section
ChristoGrab Nov 28, 2023
a0c3516
Merge branch 'master' into christo/mailchimp-start-date
ChristoGrab Nov 28, 2023
a1abc41
Merge branch 'master' of https://github.com/airbytehq/airbyte into ch…
ChristoGrab Dec 5, 2023
4b524b3
Merge branch 'master' of https://github.com/airbytehq/airbyte into ch…
ChristoGrab Dec 7, 2023
66da4fe
fix: refactor implementation of start_date
ChristoGrab Dec 7, 2023
47366b2
refactor unit tests
ChristoGrab Dec 8, 2023
f52d5af
updated expected_records
ChristoGrab Dec 8, 2023
a400070
Automated Commit - Formatting Changes
ChristoGrab Dec 8, 2023
0195913
chore: fix merge conflict
ChristoGrab Dec 8, 2023
4814279
Merge branch 'master' into christo/mailchimp-start-date
ChristoGrab Dec 11, 2023
918388d
Merge branch 'master' into christo/mailchimp-start-date
ChristoGrab Dec 11, 2023
b7d30b7
merge changes from update to 1.0.0
ChristoGrab Dec 19, 2023
2b4b74f
Merge branch 'christo/mailchimp-start-date' of https://github.com/air…
ChristoGrab Dec 19, 2023
f6953e3
add separate method for client-side filter
ChristoGrab Dec 20, 2023
aa2b6de
chore: format fix
ChristoGrab Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b03a9f3e-22a5-11eb-adc1-0242ac120002
dockerImageTag: 0.10.0
dockerImageTag: 0.11.0
dockerRepository: airbyte/source-mailchimp
documentationUrl: https://docs.airbyte.com/integrations/sources/mailchimp
githubIssueLabel: source-mailchimp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@


import base64
import re
from typing import Any, List, Mapping, Tuple

import pendulum
import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from pendulum.parsing.exceptions import ParserError
from requests.auth import AuthBase

from .streams import (
Expand Down Expand Up @@ -78,7 +81,30 @@ def get_auth(self, config: Mapping[str, Any]) -> AuthBase:


class SourceMailchimp(AbstractSource):
def _validate_start_date(self, config: Mapping[str, Any]):
start_date = config.get("start_date")

if start_date:
pattern = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z")
if not pattern.match(start_date): # Compare against the pattern descriptor.
return "Please check the format of the start date against the pattern descriptor."

try: # Handle invalid dates.
parsed_start_date = pendulum.parse(start_date)
except ParserError:
return "The provided start date is not a valid date. Please check the date you input and try again."

if parsed_start_date > pendulum.now("UTC"): # Handle future start date.
return "The start date cannot be greater than the current date."

return None

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
# First, check for a valid start date if it is provided
start_date_validation_error = self._validate_start_date(config)
if start_date_validation_error:
return False, start_date_validation_error

try:
authenticator = MailChimpAuthenticator().get_auth(config)
response = requests.get(
Expand All @@ -102,21 +128,22 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = MailChimpAuthenticator().get_auth(config)
campaign_id = config.get("campaign_id")
start_date = config.get("start_date")

lists = Lists(authenticator=authenticator)
lists = Lists(authenticator=authenticator, start_date=start_date)
interest_categories = InterestCategories(authenticator=authenticator, parent=lists)

return [
Automations(authenticator=authenticator),
Campaigns(authenticator=authenticator),
EmailActivity(authenticator=authenticator, campaign_id=campaign_id),
Automations(authenticator=authenticator, start_date=start_date),
Campaigns(authenticator=authenticator, start_date=start_date),
EmailActivity(authenticator=authenticator, start_date=start_date, campaign_id=campaign_id),
interest_categories,
Interests(authenticator=authenticator, parent=interest_categories),
lists,
ListMembers(authenticator=authenticator),
Reports(authenticator=authenticator),
SegmentMembers(authenticator=authenticator),
Segments(authenticator=authenticator),
ListMembers(authenticator=authenticator, start_date=start_date),
Reports(authenticator=authenticator, start_date=start_date),
SegmentMembers(authenticator=authenticator, start_date=start_date),
Segments(authenticator=authenticator, start_date=start_date),
Tags(authenticator=authenticator, parent=lists),
Unsubscribes(authenticator=authenticator, campaign_id=campaign_id),
Unsubscribes(authenticator=authenticator, start_date=start_date, campaign_id=campaign_id),
]
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@
}
]
},
"start_date": {
"title": "Incremental Sync Start Date",
"description": "The date from which you want to start syncing data for Incremental streams. Only records that have been created or modified since this date will be synced. If left blank, all data will by synced.",
"type": "string",
"format": "date-time",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z$",
"pattern_descriptor": "YYYY-MM-DDTHH:MM:SS.000Z",
"examples": ["2020-01-01T00:00:00.000Z"]
},
"campaign_id": {
"type": "string",
"title": "ID of a campaign to sync email activities",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import StreamData
Expand Down Expand Up @@ -80,6 +81,10 @@ def read_records(
class IncrementalMailChimpStream(MailChimpStream, ABC):
state_checkpoint_interval = math.inf

def __init__(self, **kwargs):
self.start_date = kwargs.pop("start_date", None)
super().__init__(**kwargs)

@property
@abstractmethod
def cursor_field(self) -> str:
Expand Down Expand Up @@ -124,6 +129,24 @@ def request_params(self, stream_state=None, stream_slice=None, **kwargs):
params.update(default_params)
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
Mailchimp endpoints do not always support filtering by date,
so we should filter out records manually against the start_date as a fallback.
"""
response = super().parse_response(response, **kwargs)
start_date = pendulum.parse(self.start_date) if self.start_date else None

if start_date:
for record in response:
parsed_date = pendulum.parse(record.get(self.cursor_field))
if parsed_date >= start_date:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need filter records on our side if we already use slicing with default_params and sending request with specified range in query params?

Copy link
Contributor Author

@ChristoGrab ChristoGrab Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query params used for server-side sorting and filtering are not supported by all Mailchimp endpoints, so certain streams (such as SegmentMembers and Unsubscribes) require us to add any filtering logic on our end regardless. My thinking was that by applying our own filter here we ensure the consistent usage of start_date across all streams.

Another option would be to incorporate the start_date in the server-side params by default and add a helper method with our own filtering logic that is only invoked by streams where it is necessary. This would be more resource efficient, but my one concern with this solution is that it could lead to more costly maintenance and debugging down the line if/when new streams are implemented that do not support the filtering params. Let me know if you still think this is a more sensible approach, or if there's another solution you would recommend!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I observed two aspects that need to be addressed separately. Firstly, we should establish the optional nature of the start_date field, and then we can strategize on introducing an incremental approach, which is currently unsupported.

As for the first aspect, I suggest defining a default value for cases where the start_date is not provided. If we could retrieves all data without applying any filtering query params let's do this, but if to get any data we must specify it let it be some default value. Based on my experience, a reasonable default could be to fetch data from two years ago starting from the date of the first synchronization.

Copy link
Contributor Author

@ChristoGrab ChristoGrab Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lazebnyi I've refactored the implementation of the start_date filtering to align with the existing use of slicing in request params as you noted. The start_date is now compared against the current slice's cursor value to determine which will be used as the filter in request params.

For the Segment Members and Unsubscribes streams the filtering still takes place during the response parsing instead, since their endpoints do not support the filtering param. One thing I did choose to leave out is a default start_date value, as the API does allow us retrieve all data by not including a filter param in the request. Provided no cursor value exists yet for the current slice and no start_date is set in the config, the param is left empty and all data is fetched.

yield record
# If no start date is provided, return all records
else:
for record in response:
yield record


class MailChimpListSubStream(IncrementalMailChimpStream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def data_center_fixture():

@fixture(name="config")
def config_fixture(data_center):
return {"apikey": f"API_KEY-{data_center}"}
return {"apikey": f"API_KEY-{data_center}", "start_date": "2022-01-01T00:00:00.000Z"}


@fixture(name="access_token")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,29 @@ def test_wrong_config(wrong_config):
MailChimpAuthenticator().get_auth(wrong_config)


@pytest.mark.parametrize(
"config, expected_return",
[
({}, None),
({"start_date": "2021-01-01T00:00:00.000Z"}, None),
({"start_date": "2021-99-99T79:89:99.123Z"}, "The provided start date is not a valid date. Please check the date you input and try again."),
({"start_date": "2021-01-01T00:00:00.000"}, "Please check the format of the start date against the pattern descriptor."),
({"start_date": "2025-01-25T00:00:00.000Z"}, "The start date cannot be greater than the current date."),
],
ids=[
"No start date",
"Valid start date",
"Invalid start date",
"Invalid format",
"Future start date",
]
)
def test_validate_start_date(config, expected_return):
source = SourceMailchimp()
result = source._validate_start_date(config)
assert result == expected_return


def test_streams_count(config):
streams = SourceMailchimp().streams(config)
assert len(streams) == 12
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,29 @@ def test_reports_remove_empty_datetime_fields(auth, record, expected_return):
"""
stream = Reports(authenticator=auth)
assert stream.remove_empty_datetime_fields(record) == expected_return, f"Expected: {expected_return}, Actual: {stream.remove_empty_datetime_fields(record)}"



@pytest.mark.parametrize("start_date, expected_ids", [
(None, [1, 2, 3]),
("2022-01-01T00:00:00.000Z", [1, 3]),
("2100-01-01T00:00:00.000Z", [])
],
ids=[
"No start_date: all records read",
"start_date provided: only records >= start_date read",
"Start date > records: no records read"
]
)
def test_incremental_start_date(auth, start_date, expected_ids, requests_mock):
mock_data = [
{"id": 1, "create_time": "2022-01-05T00:00:00.000Z"},
{"id": 2, "create_time": "2021-12-25T00:00:00.000Z"},
{"id": 3, "create_time": "2022-01-01T00:00:00.000Z"},
]
stream = Campaigns(authenticator=auth, start_date=start_date)

response = MagicMock()
response.json.return_value = {"campaigns": mock_data}
records = list(stream.parse_response(response))

assert [record["id"] for record in records] == expected_ids
9 changes: 8 additions & 1 deletion docs/integrations/sources/mailchimp.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ You can use [OAuth](https://mailchimp.com/developer/marketing/guides/access-user
3. On the Set up the source page, select **Mailchimp** from the Source type dropdown.
4. Enter a name for your source.

6. You can use OAuth or an API key to authenticate your Mailchimp account. We recommend using OAuth for Airbyte Cloud and an API key for Airbyte Open Source.
5. You can use OAuth or an API key to authenticate your Mailchimp account. We recommend using OAuth for Airbyte Cloud and an API key for Airbyte Open Source.
- To authenticate using OAuth for Airbyte Cloud, ensure you have [registered your Mailchimp account](#prerequisite) and then click **Authenticate your Mailchimp account** to sign in with Mailchimp and authorize your account.
- To authenticate using an API key for Airbyte Open Source, select **API key** from the Authentication dropdown and enter the [API key](https://mailchimp.com/developer/marketing/guides/quick-start/#generate-your-api-key) for your Mailchimp account.
:::note
Check the [performance considerations](#performance-considerations) before using an API key.
:::
6. (Optional) You may optionally provide an **Incremental Sync Start Date** using the provided datepicker, or by programmatically entering a UTC date-time in the format `YYYY-MM-DDThh:mm:ss.sssZ`. If set, only data generated on or after the configured date-time will be synced for Incremental streams. Leaving this field blank will sync all data returned from the API. Please note that this option has no effect on streams using Full Refresh sync mode.
7. Click **Set up source**.

## Supported sync modes
Expand All @@ -37,12 +38,17 @@ The Mailchimp source connector supports the following streams:
[Email Activity](https://mailchimp.com/developer/marketing/api/email-activity-reports/list-email-activity/)
[Interests](https://mailchimp.com/developer/marketing/api/interests/list-interests-in-category/)
[Interest Categories](https://mailchimp.com/developer/marketing/api/interest-categories/list-interest-categories/)
[Interests](https://mailchimp.com/developer/marketing/api/interests/list-interests-in-category/)
[Interest Categories](https://mailchimp.com/developer/marketing/api/interest-categories/list-interest-categories/)
[Lists](https://mailchimp.com/developer/api/marketing/lists/get-list-info)
[List Members](https://mailchimp.com/developer/marketing/api/list-members/list-members-info/)
[List Members](https://mailchimp.com/developer/marketing/api/list-members/list-members-info/)
[Reports](https://mailchimp.com/developer/marketing/api/reports/list-campaign-reports/)
[Segments](https://mailchimp.com/developer/marketing/api/list-segments/list-segments/)
[Segment Members](https://mailchimp.com/developer/marketing/api/list-segment-members/list-members-in-segment/)
[Tags](https://mailchimp.com/developer/marketing/api/lists-tags-search/search-for-tags-on-a-list-by-name/)
[Segment Members](https://mailchimp.com/developer/marketing/api/list-segment-members/list-members-in-segment/)
[Tags](https://mailchimp.com/developer/marketing/api/lists-tags-search/search-for-tags-on-a-list-by-name/)
[Unsubscribes](https://mailchimp.com/developer/marketing/api/unsub-reports/list-unsubscribed-members/)

### A note on primary keys
Expand Down Expand Up @@ -78,6 +84,7 @@ Now that you have set up the Mailchimp source connector, check out the following

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------|
| 0.11.0 | 2023-11-27 | [32852](https://github.com/airbytehq/airbyte/pull/32852) | Add optional start_date for incremental streams |
| 0.10.0 | 2023-11-23 | [32782](https://github.com/airbytehq/airbyte/pull/32782) | Add SegmentMembers stream |
| 0.9.0 | 2023-11-17 | [32218](https://github.com/airbytehq/airbyte/pull/32218) | Add Interests, InterestCategories, Tags streams |
| 0.8.3 | 2023-11-15 | [32543](https://github.com/airbytehq/airbyte/pull/32543) | Handle empty datetime fields in Reports stream |
Expand Down
Loading