-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ Source Mailchimp: Add optional start_date to config #32852
Changes from 40 commits
47a6a5a
b611729
a9cc2b9
e74bbde
918e64c
0f28144
39ac79c
667be92
059ca2f
d81ae88
74d359d
3e69945
4cdfc7a
428e1b3
b5d0ede
86848a6
1fda1df
941d937
d17e251
b2a546e
1a0bb33
a660279
00a896b
05c5acb
88b24cf
604a2ab
c28ca4d
04a9d68
a0c3516
a1abc41
4b524b3
66da4fe
47366b2
f52d5af
a400070
0195913
4814279
918388d
b7d30b7
2b4b74f
f6953e3
aa2b6de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from abc import ABC, abstractmethod | ||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional | ||
|
||
import pendulum | ||
import requests | ||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.streams.core import StreamData | ||
|
@@ -81,6 +82,10 @@ def read_records( | |
class IncrementalMailChimpStream(MailChimpStream, ABC): | ||
state_checkpoint_interval = math.inf | ||
|
||
def __init__(self, **kwargs): | ||
self.start_date = kwargs.pop("start_date", None) | ||
super().__init__(**kwargs) | ||
|
||
@property | ||
@abstractmethod | ||
def cursor_field(self) -> str: | ||
|
@@ -129,11 +134,30 @@ def stream_slices( | |
) -> Iterable[Optional[Mapping[str, Any]]]: | ||
slice_ = {} | ||
stream_state = stream_state or {} | ||
cursor_value = stream_state.get(self.cursor_field) | ||
cursor_value = self.get_filter_date(self.start_date, stream_state.get(self.cursor_field)) | ||
if cursor_value: | ||
slice_[self.filter_field] = cursor_value | ||
yield slice_ | ||
|
||
@staticmethod | ||
def get_filter_date(start_date: str, state_date: str) -> str: | ||
""" | ||
Calculate the filter date to pass in the request parameters by comparing the start_date | ||
with the value of state obtained from the stream_slice. | ||
If only one value exists, use it by default. Otherwise, return None. | ||
If no filter_date is provided, the API will fetch all available records. | ||
""" | ||
|
||
start_date_parsed = pendulum.parse(start_date).to_iso8601_string() if start_date else None | ||
state_date_parsed = pendulum.parse(state_date).to_iso8601_string() if state_date else None | ||
|
||
if start_date_parsed and state_date_parsed: | ||
return max(start_date_parsed, state_date_parsed) | ||
elif state_date_parsed or start_date_parsed: | ||
return state_date_parsed or start_date_parsed | ||
else: | ||
return None | ||
|
||
def request_params(self, stream_state=None, stream_slice=None, **kwargs): | ||
stream_state = stream_state or {} | ||
stream_slice = stream_slice or {} | ||
|
@@ -157,7 +181,11 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |
stream_state = stream_state or {} | ||
parent = Lists(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) | ||
for slice in parent: | ||
yield {"list_id": slice["id"]} | ||
slice_ = {"list_id": slice["id"]} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we can rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call, updated |
||
cursor_value = self.get_filter_date(self.start_date, stream_state.get(slice["id"], {}).get(self.cursor_field)) | ||
if cursor_value: | ||
slice_[self.filter_field] = cursor_value | ||
yield slice_ | ||
|
||
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: | ||
list_id = stream_slice.get("list_id") | ||
|
@@ -238,7 +266,8 @@ def stream_slices( | |
campaigns = Campaigns(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) | ||
for campaign in campaigns: | ||
slice_ = {"campaign_id": campaign["id"]} | ||
cursor_value = stream_state.get(campaign["id"], {}).get(self.cursor_field) | ||
state_value = stream_state.get(campaign["id"], {}).get(self.cursor_field) | ||
cursor_value = self.get_filter_date(self.start_date, state_value) | ||
if cursor_value: | ||
slice_[self.filter_field] = cursor_value | ||
yield slice_ | ||
|
@@ -359,17 +388,23 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: | |
|
||
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice, **kwargs) -> Iterable[Mapping]: | ||
""" | ||
SegmentMembers endpoint does not support sorting, so we need to filter out records that are older than the current state | ||
The SegmentMembers endpoint does not support sorting or filtering, | ||
so we need to apply our own filtering logic before reading. | ||
""" | ||
response = super().parse_response(response, **kwargs) | ||
|
||
# Calculate the filter date to compare all records against in this slice | ||
slice_cursor_value = stream_state.get(str(stream_slice.get("segment_id")), {}).get(self.cursor_field) | ||
filter_date = self.get_filter_date(self.start_date, slice_cursor_value) | ||
|
||
for record in response: | ||
# Add the segment_id foreign_key to each record | ||
record["segment_id"] = stream_slice.get("segment_id") | ||
|
||
current_cursor_value = stream_state.get(str(record.get("segment_id")), {}).get(self.cursor_field) | ||
record_cursor_value = record.get(self.cursor_field) | ||
if current_cursor_value is None or record_cursor_value >= current_cursor_value: | ||
if filter_date is None or record_cursor_value >= filter_date: | ||
# Add the segment_id foreign_key to each record | ||
record["segment_id"] = stream_slice.get("segment_id") | ||
yield record | ||
|
||
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: | ||
|
@@ -453,15 +488,20 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: | |
campaign_id = stream_slice.get("campaign_id") | ||
return f"reports/{campaign_id}/unsubscribed" | ||
|
||
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: | ||
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice, **kwargs) -> Iterable[Mapping]: | ||
""" | ||
The Unsubscribes endpoint does not support sorting or filtering, | ||
so we need to apply our own filtering logic before reading. | ||
""" | ||
|
||
response = super().parse_response(response, **kwargs) | ||
|
||
# Unsubscribes endpoint does not support sorting, so we need to filter out records that are older than the current state | ||
slice_cursor_value = stream_state.get(stream_slice.get("campaign_id", {}), {}).get(self.cursor_field) | ||
filter_date = self.get_filter_date(self.start_date, slice_cursor_value) | ||
|
||
for record in response: | ||
current_cursor_value = stream_state.get(record.get("campaign_id"), {}).get(self.cursor_field) | ||
record_cursor_value = record.get(self.cursor_field) | ||
if current_cursor_value is None or record_cursor_value >= current_cursor_value: | ||
if filter_date is None or record_cursor_value >= filter_date: | ||
yield record | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is can be separate filtration function check Hubspot connector as example There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the pointer 👍 A separate method, filter_old_records, is now declared in the base MailchimpIncrementalStream class and invoked by streams which do not support the server-side filter. |
||
|
||
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated