Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Mailchimp: revert extra logging #22405

Merged
merged 3 commits into from Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -979,7 +979,7 @@
- name: Mailchimp
sourceDefinitionId: b03a9f3e-22a5-11eb-adc1-0242ac120002
dockerRepository: airbyte/source-mailchimp
dockerImageTag: 0.3.3
dockerImageTag: 0.3.4
documentationUrl: https://docs.airbyte.com/integrations/sources/mailchimp
icon: mailchimp.svg
sourceType: api
Expand Down
Expand Up @@ -7895,7 +7895,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mailchimp:0.3.3"
- dockerImage: "airbyte/source-mailchimp:0.3.4"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mailchimp"
connectionSpecification:
Expand Down
Expand Up @@ -12,5 +12,5 @@ COPY main.py ./
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.name=airbyte/source-mailchimp
Expand Up @@ -55,7 +55,6 @@ def request_params(
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
self.logger.info(f"Parsing response for stream {self.name}")
response_json = response.json()
yield from response_json[self.data_field]

Expand Down Expand Up @@ -98,13 +97,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
self.logger.info(f"Slicing stream: {self.name}")
slice_ = {}
stream_state = stream_state or {}
cursor_value = stream_state.get(self.cursor_field)
if cursor_value:
slice_[self.filter_field] = cursor_value
self.logger.info(f"Yielding slice {slice_}")
yield slice_

def request_params(self, stream_state=None, stream_slice=None, **kwargs):
Expand All @@ -113,7 +110,6 @@ def request_params(self, stream_state=None, stream_slice=None, **kwargs):
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
default_params = {"sort_field": self.sort_field, "sort_dir": "ASC", **stream_slice}
params.update(default_params)
self.logger.info(f"Request params are {params}")
return params


Expand Down Expand Up @@ -148,20 +144,16 @@ def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
stream_state = stream_state or {}
self.logger.info(f"Slicing the stream: {self.name}")
if self.campaign_id:
# this is a workaround to speed up SATs and enable incremental tests
campaigns = [{"id": self.campaign_id}]
else:
self.logger.info("Reading campaigns")
campaigns = Campaigns(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh)
self.logger.info("Starting for loop to slice the stream")
for campaign in campaigns:
slice_ = {"campaign_id": campaign["id"]}
cursor_value = stream_state.get(campaign["id"], {}).get(self.cursor_field)
if cursor_value:
slice_[self.filter_field] = cursor_value
self.logger.info(f"Yielding slice {slice_}")
yield slice_

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
Expand All @@ -187,7 +179,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return current_stream_state

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
self.logger.info(f"Parsing response for stream {self.name}")
response_json = response.json()
# transform before save
# [{'campaign_id', 'list_id', 'list_is_active', 'email_id', 'email_address', 'activity[array[object]]', '_links'}] ->
Expand All @@ -196,4 +187,3 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
for item in data:
for activity_item in item.pop("activity", []):
yield {**item, **activity_item}
self.logger.info("Parsed response")
1 change: 1 addition & 0 deletions docs/integrations/sources/mailchimp.md
Expand Up @@ -230,6 +230,7 @@ Now that you have set up the Mailchimp source connector, check out the following

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------|
| 0.3.4 | 2023-02-06 | [22405](https://github.com/airbytehq/airbyte/pull/22405) | Revert extra logging |
| 0.3.3 | 2023-02-01 | [22228](https://github.com/airbytehq/airbyte/pull/22228) | Add extra logging |
| 0.3.2 | 2023-01-27 | [22014](https://github.com/airbytehq/airbyte/pull/22014) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.3.1 | 2022-12-20 | [20720](https://github.com/airbytehq/airbyte/pull/20720) | Use stream slices as a source for request params instead of a stream state |
Expand Down