diff --git a/airbyte-integrations/connectors/source-google-ads/integration_tests/state.json b/airbyte-integrations/connectors/source-google-ads/integration_tests/state.json index ce29ca2dbd2d7..60ad9790af5fa 100644 --- a/airbyte-integrations/connectors/source-google-ads/integration_tests/state.json +++ b/airbyte-integrations/connectors/source-google-ads/integration_tests/state.json @@ -29,12 +29,12 @@ "type": "STREAM", "stream": { "stream_descriptor": { - "name": "ad_group_criterions" + "name": "ad_group_criterion" }, "stream_state": { "change_status": { "4651612872": { - "change_status.last_change_date_time": "2023-08-01 13:20:01.003295" + "change_status.last_change_date_time": "2023-11-01 13:20:01.003295" } } } @@ -44,12 +44,12 @@ "type": "STREAM", "stream": { "stream_descriptor": { - "name": "ad_listing_group_criterions" + "name": "ad_listing_group_criterion" }, "stream_state": { "change_status": { "4651612872": { - "change_status.last_change_date_time": "2023-08-01 13:20:01.003295" + "change_status.last_change_date_time": "2023-11-01 13:20:01.003295" } } } @@ -64,7 +64,7 @@ "stream_state": { "change_status": { "4651612872": { - "change_status.last_change_date_time": "2023-08-01 13:20:01.003295" + "change_status.last_change_date_time": "2023-11-01 13:20:01.003295" } } } diff --git a/airbyte-integrations/connectors/source-google-ads/metadata.yaml b/airbyte-integrations/connectors/source-google-ads/metadata.yaml index 2b4a17d44bb88..9df42221f3ffc 100644 --- a/airbyte-integrations/connectors/source-google-ads/metadata.yaml +++ b/airbyte-integrations/connectors/source-google-ads/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50 - dockerImageTag: 2.0.2 + dockerImageTag: 2.0.3 dockerRepository: airbyte/source-google-ads documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads githubIssueLabel: source-google-ads diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py index c37d574841099..dca2ebed9fad1 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py @@ -666,23 +666,49 @@ def parse_response(self, response: SearchPager, stream_slice: MutableMapping[str record[self.cursor_field] = cursor_value yield record - def _update_state(self): + def _update_state(self, stream_slice: MutableMapping[str, Any]): + customer_id = stream_slice.get("customer_id") + # if parent stream was used - copy state from it, otherwise set default state - if self.parent_stream.state: - self._state = {self.parent_stream_name: self.parent_stream.state} + if isinstance(self.parent_stream.state, dict) and self.parent_stream.state.get(customer_id): + self._state[self.parent_stream_name][customer_id] = self.parent_stream.state[customer_id] else: + parent_state = {self.parent_cursor_field: pendulum.today().start_of("day").format(self.parent_stream.cursor_time_format)} # full refresh sync without parent stream - self._state = { - self.parent_stream_name: { - self.parent_cursor_field: pendulum.today().start_of("day").format(self.parent_stream.cursor_time_format) - } - } + self._state[self.parent_stream_name].update({customer_id: parent_state}) def _read_deleted_records(self, stream_slice: MutableMapping[str, Any] = None): # yield deleted records with id and time when record was deleted for deleted_record_id in stream_slice.get("deleted_ids", []): yield {self.id_field: deleted_record_id, "deleted_at": stream_slice["record_changed_time_map"].get(deleted_record_id)} + def _split_slice(self, child_slice: MutableMapping[str, Any], chunk_size: int = 10000) -> Iterable[Mapping[str, Any]]: + """ + Splits a child slice into smaller chunks based on the chunk_size. + + Parameters: + - child_slice (MutableMapping[str, Any]): The input dictionary to split. + - chunk_size (int, optional): The maximum number of ids per chunk. Defaults to 10000, + because it is the maximum number of ids that can be present in a query filter. + + Yields: + - Mapping[str, Any]: A dictionary with a similar structure to child_slice. + """ + updated_ids = list(child_slice["updated_ids"]) + if not updated_ids: + yield child_slice + return + + record_changed_time_map = child_slice["record_changed_time_map"] + customer_id = child_slice["customer_id"] + + # Split the updated_ids into chunks and yield them + for i in range(0, len(updated_ids), chunk_size): + chunk_ids = set(updated_ids[i : i + chunk_size]) + chunk_time_map = {k: record_changed_time_map[k] for k in chunk_ids} + + yield {"updated_ids": chunk_ids, "record_changed_time_map": chunk_time_map, "customer_id": customer_id, "deleted_ids": set()} + def read_records( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: MutableMapping[str, Any] = None, **kwargs ) -> Iterable[Mapping[str, Any]]: @@ -690,12 +716,13 @@ def read_records( This method is overridden to read records using parent stream """ # if state is present read records by ids from slice otherwise full refresh sync - yield from super().read_records(sync_mode, stream_slice=stream_slice) + for stream_slice_part in self._split_slice(stream_slice): + yield from super().read_records(sync_mode, stream_slice=stream_slice_part) # yield deleted items yield from self._read_deleted_records(stream_slice) - self._update_state() + self._update_state(stream_slice) def get_query(self, stream_slice: Mapping[str, Any] = None) -> str: table_name = get_resource_name(self.name) diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py index a1359e301441b..b136cee2cb1c9 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py @@ -3,8 +3,9 @@ # from copy import deepcopy -from unittest.mock import DEFAULT, Mock, call +from unittest.mock import DEFAULT, MagicMock, Mock, call +import pendulum import pytest from airbyte_cdk.models import SyncMode from airbyte_cdk.utils import AirbyteTracedException @@ -417,3 +418,153 @@ def test_incremental_events_stream_get_query(mocker, config, customers): # Check if the query generated by the get_query method matches the expected query assert are_queries_equivalent(query, expected_query) + + +def test_read_records_with_slice_splitting(mocker, config): + """ + Test the read_records method to ensure it correctly splits the stream_slice and calls the parent's read_records. + """ + # Define a stream_slice with 15,000 ids to ensure it gets split during processing + stream_slice = { + "updated_ids": set(range(15000)), + "record_changed_time_map": {i: f"time_{i}" for i in range(15000)}, + "customer_id": "sample_customer_id", + "deleted_ids": set(), + } + + # Create a mock instance of the CampaignCriterion stream + google_api = MockGoogleAds(credentials=config["credentials"]) + stream = CampaignCriterion(api=google_api, customers=[]) + + # Mock methods that are expected to be called during the test + super_read_records_mock = MagicMock() + mocker.patch("source_google_ads.streams.GoogleAdsStream.read_records", super_read_records_mock) + read_deleted_records_mock = mocker.patch.object(stream, "_read_deleted_records", return_value=[]) + update_state_mock = mocker.patch.object(stream, "_update_state") + + # Execute the method under test + list(stream.read_records(SyncMode.incremental, stream_slice=stream_slice)) + + # Verify that the parent's read_records method was called twice due to splitting + assert super_read_records_mock.call_count == 2 + + # Define the expected slices after the stream_slice is split + expected_first_slice = { + "updated_ids": set(range(10000)), + "record_changed_time_map": {i: f"time_{i}" for i in range(10000)}, + "customer_id": "sample_customer_id", + "deleted_ids": set(), + } + expected_second_slice = { + "updated_ids": set(range(10000, 15000)), + "record_changed_time_map": {i: f"time_{i}" for i in range(10000, 15000)}, + "customer_id": "sample_customer_id", + "deleted_ids": set(), + } + + # Verify the arguments passed to the parent's read_records method for both calls + first_call_args, first_call_kwargs = super_read_records_mock.call_args_list[0] + assert first_call_args[0] == SyncMode.incremental + assert first_call_kwargs["stream_slice"] == expected_first_slice + second_call_args, second_call_kwargs = super_read_records_mock.call_args_list[1] + assert second_call_args[0] == SyncMode.incremental + assert second_call_kwargs["stream_slice"] == expected_second_slice + + # Ensure that the mocked methods were called as expected + read_deleted_records_mock.assert_called_once_with(stream_slice) + update_state_mock.assert_called_once_with(stream_slice) + + +def test_update_state_with_parent_state(mocker): + """ + Test the _update_state method when the parent_stream has a state. + """ + # Mock instance setup + stream = CampaignCriterion(api=MagicMock(), customers=[]) + + # Mock parent_stream with initial state + stream.parent_stream.state = {"customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"}} + + # Call the _update_state method with the first stream_slice + stream_slice_first = {"customer_id": "customer_id_1"} + stream._update_state(stream_slice_first) + + # Assert the state after the first call + expected_state_first_call = {"change_status": {"customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"}}} + assert stream._state == expected_state_first_call + + # Update the parent_stream state for the second call + stream.parent_stream.state = {"customer_id_2": {"change_status.last_change_date_time": "2023-10-21 00:00:00.000000"}} + + # Call the _update_state method with the second stream_slice + stream_slice_second = {"customer_id": "customer_id_2"} + stream._update_state(stream_slice_second) + + # Assert the state after the second call + expected_state_second_call = { + "change_status": { + "customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"}, + "customer_id_2": {"change_status.last_change_date_time": "2023-10-21 00:00:00.000000"}, + } + } + assert stream._state == expected_state_second_call + + # Set pendulum to return a consistent value + now = pendulum.datetime(2023, 11, 2, 12, 53, 7) + pendulum.set_test_now(now) + + # Call the _update_state method with the third stream_slice + stream_slice_third = {"customer_id": "customer_id_3"} + stream._update_state(stream_slice_third) + + # Assert the state after the third call + expected_state_third_call = { + "change_status": { + "customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"}, + "customer_id_2": {"change_status.last_change_date_time": "2023-10-21 00:00:00.000000"}, + "customer_id_3": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"}, + } + } + assert stream._state == expected_state_third_call + + # Reset the pendulum mock to its original state + pendulum.set_test_now() + + +def test_update_state_without_parent_state(mocker): + """ + Test the _update_state method when the parent_stream does not have a state. + """ + # Reset any previous mock state for pendulum + pendulum.set_test_now() + + # Mock instance setup + stream = CampaignCriterion(api=MagicMock(), customers=[]) + + # Mock pendulum call to return a consistent value + now = pendulum.datetime(2023, 11, 2, 12, 53, 7) + pendulum.set_test_now(now) + + # Call the _update_state method with the first stream_slice + stream_slice_first = {"customer_id": "customer_id_1"} + stream._update_state(stream_slice_first) + + # Assert the state after the first call + expected_state_first_call = {"change_status": {"customer_id_1": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"}}} + assert stream._state == expected_state_first_call + + # Call the _update_state method with the second stream_slice + stream_slice_second = {"customer_id": "customer_id_2"} + stream._update_state(stream_slice_second) + + # Assert the state after the second call + expected_state_second_call = { + "change_status": { + "customer_id_1": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"}, + "customer_id_2": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"}, + } + } + assert stream._state == expected_state_second_call + + # Reset the pendulum mock to its original state + pendulum.set_test_now() diff --git a/docs/integrations/sources/google-ads.md b/docs/integrations/sources/google-ads.md index 568822891357a..d9085e91de502 100644 --- a/docs/integrations/sources/google-ads.md +++ b/docs/integrations/sources/google-ads.md @@ -278,7 +278,8 @@ Due to a limitation in the Google Ads API which does not allow getting performan | Version | Date | Pull Request | Subject | |:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| -| `2.0.2` | 2023-10-31 | [32001](https://github.com/airbytehq/airbyte/pull/32001) | Added handling (retry) for `InternalServerError` while reading the streams | +| `2.0.3` | 2023-11-02 | [32102](https://github.com/airbytehq/airbyte/pull/32102) | Fix incremental events streams | +| `2.0.2` | 2023-10-31 | [32001](https://github.com/airbytehq/airbyte/pull/32001) | Added handling (retry) for `InternalServerError` while reading the streams | | `2.0.1` | 2023-10-27 | [31908](https://github.com/airbytehq/airbyte/pull/31908) | Base image migration: remove Dockerfile and use the python-connector-base image | | `2.0.0` | 2023-10-04 | [31048](https://github.com/airbytehq/airbyte/pull/31048) | Fix schem default streams, change names of streams. | | `1.0.0` | 2023-09-28 | [30705](https://github.com/airbytehq/airbyte/pull/30705) | Fix schemas for custom queries |