Skip to content

Commit

Permalink
馃悰 Source Google Ads: Fix incremental events streams (#32102)
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Nov 3, 2023
1 parent c54c84e commit bc500ee
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 18 deletions.
Expand Up @@ -29,12 +29,12 @@
"type": "STREAM",
"stream": {
"stream_descriptor": {
"name": "ad_group_criterions"
"name": "ad_group_criterion"
},
"stream_state": {
"change_status": {
"4651612872": {
"change_status.last_change_date_time": "2023-08-01 13:20:01.003295"
"change_status.last_change_date_time": "2023-11-01 13:20:01.003295"
}
}
}
Expand All @@ -44,12 +44,12 @@
"type": "STREAM",
"stream": {
"stream_descriptor": {
"name": "ad_listing_group_criterions"
"name": "ad_listing_group_criterion"
},
"stream_state": {
"change_status": {
"4651612872": {
"change_status.last_change_date_time": "2023-08-01 13:20:01.003295"
"change_status.last_change_date_time": "2023-11-01 13:20:01.003295"
}
}
}
Expand All @@ -64,7 +64,7 @@
"stream_state": {
"change_status": {
"4651612872": {
"change_status.last_change_date_time": "2023-08-01 13:20:01.003295"
"change_status.last_change_date_time": "2023-11-01 13:20:01.003295"
}
}
}
Expand Down
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/source-google-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
githubIssueLabel: source-google-ads
Expand Down
Expand Up @@ -666,36 +666,63 @@ def parse_response(self, response: SearchPager, stream_slice: MutableMapping[str
record[self.cursor_field] = cursor_value
yield record

def _update_state(self):
def _update_state(self, stream_slice: MutableMapping[str, Any]):
customer_id = stream_slice.get("customer_id")

# if parent stream was used - copy state from it, otherwise set default state
if self.parent_stream.state:
self._state = {self.parent_stream_name: self.parent_stream.state}
if isinstance(self.parent_stream.state, dict) and self.parent_stream.state.get(customer_id):
self._state[self.parent_stream_name][customer_id] = self.parent_stream.state[customer_id]
else:
parent_state = {self.parent_cursor_field: pendulum.today().start_of("day").format(self.parent_stream.cursor_time_format)}
# full refresh sync without parent stream
self._state = {
self.parent_stream_name: {
self.parent_cursor_field: pendulum.today().start_of("day").format(self.parent_stream.cursor_time_format)
}
}
self._state[self.parent_stream_name].update({customer_id: parent_state})

def _read_deleted_records(self, stream_slice: MutableMapping[str, Any] = None):
# yield deleted records with id and time when record was deleted
for deleted_record_id in stream_slice.get("deleted_ids", []):
yield {self.id_field: deleted_record_id, "deleted_at": stream_slice["record_changed_time_map"].get(deleted_record_id)}

def _split_slice(self, child_slice: MutableMapping[str, Any], chunk_size: int = 10000) -> Iterable[Mapping[str, Any]]:
"""
Splits a child slice into smaller chunks based on the chunk_size.
Parameters:
- child_slice (MutableMapping[str, Any]): The input dictionary to split.
- chunk_size (int, optional): The maximum number of ids per chunk. Defaults to 10000,
because it is the maximum number of ids that can be present in a query filter.
Yields:
- Mapping[str, Any]: A dictionary with a similar structure to child_slice.
"""
updated_ids = list(child_slice["updated_ids"])
if not updated_ids:
yield child_slice
return

record_changed_time_map = child_slice["record_changed_time_map"]
customer_id = child_slice["customer_id"]

# Split the updated_ids into chunks and yield them
for i in range(0, len(updated_ids), chunk_size):
chunk_ids = set(updated_ids[i : i + chunk_size])
chunk_time_map = {k: record_changed_time_map[k] for k in chunk_ids}

yield {"updated_ids": chunk_ids, "record_changed_time_map": chunk_time_map, "customer_id": customer_id, "deleted_ids": set()}

def read_records(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: MutableMapping[str, Any] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
"""
This method is overridden to read records using parent stream
"""
# if state is present read records by ids from slice otherwise full refresh sync
yield from super().read_records(sync_mode, stream_slice=stream_slice)
for stream_slice_part in self._split_slice(stream_slice):
yield from super().read_records(sync_mode, stream_slice=stream_slice_part)

# yield deleted items
yield from self._read_deleted_records(stream_slice)

self._update_state()
self._update_state(stream_slice)

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
table_name = get_resource_name(self.name)
Expand Down
Expand Up @@ -3,8 +3,9 @@
#

from copy import deepcopy
from unittest.mock import DEFAULT, Mock, call
from unittest.mock import DEFAULT, MagicMock, Mock, call

import pendulum
import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -417,3 +418,153 @@ def test_incremental_events_stream_get_query(mocker, config, customers):

# Check if the query generated by the get_query method matches the expected query
assert are_queries_equivalent(query, expected_query)


def test_read_records_with_slice_splitting(mocker, config):
"""
Test the read_records method to ensure it correctly splits the stream_slice and calls the parent's read_records.
"""
# Define a stream_slice with 15,000 ids to ensure it gets split during processing
stream_slice = {
"updated_ids": set(range(15000)),
"record_changed_time_map": {i: f"time_{i}" for i in range(15000)},
"customer_id": "sample_customer_id",
"deleted_ids": set(),
}

# Create a mock instance of the CampaignCriterion stream
google_api = MockGoogleAds(credentials=config["credentials"])
stream = CampaignCriterion(api=google_api, customers=[])

# Mock methods that are expected to be called during the test
super_read_records_mock = MagicMock()
mocker.patch("source_google_ads.streams.GoogleAdsStream.read_records", super_read_records_mock)
read_deleted_records_mock = mocker.patch.object(stream, "_read_deleted_records", return_value=[])
update_state_mock = mocker.patch.object(stream, "_update_state")

# Execute the method under test
list(stream.read_records(SyncMode.incremental, stream_slice=stream_slice))

# Verify that the parent's read_records method was called twice due to splitting
assert super_read_records_mock.call_count == 2

# Define the expected slices after the stream_slice is split
expected_first_slice = {
"updated_ids": set(range(10000)),
"record_changed_time_map": {i: f"time_{i}" for i in range(10000)},
"customer_id": "sample_customer_id",
"deleted_ids": set(),
}
expected_second_slice = {
"updated_ids": set(range(10000, 15000)),
"record_changed_time_map": {i: f"time_{i}" for i in range(10000, 15000)},
"customer_id": "sample_customer_id",
"deleted_ids": set(),
}

# Verify the arguments passed to the parent's read_records method for both calls
first_call_args, first_call_kwargs = super_read_records_mock.call_args_list[0]
assert first_call_args[0] == SyncMode.incremental
assert first_call_kwargs["stream_slice"] == expected_first_slice
second_call_args, second_call_kwargs = super_read_records_mock.call_args_list[1]
assert second_call_args[0] == SyncMode.incremental
assert second_call_kwargs["stream_slice"] == expected_second_slice

# Ensure that the mocked methods were called as expected
read_deleted_records_mock.assert_called_once_with(stream_slice)
update_state_mock.assert_called_once_with(stream_slice)


def test_update_state_with_parent_state(mocker):
"""
Test the _update_state method when the parent_stream has a state.
"""
# Mock instance setup
stream = CampaignCriterion(api=MagicMock(), customers=[])

# Mock parent_stream with initial state
stream.parent_stream.state = {"customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"}}

# Call the _update_state method with the first stream_slice
stream_slice_first = {"customer_id": "customer_id_1"}
stream._update_state(stream_slice_first)

# Assert the state after the first call
expected_state_first_call = {"change_status": {"customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"}}}
assert stream._state == expected_state_first_call

# Update the parent_stream state for the second call
stream.parent_stream.state = {"customer_id_2": {"change_status.last_change_date_time": "2023-10-21 00:00:00.000000"}}

# Call the _update_state method with the second stream_slice
stream_slice_second = {"customer_id": "customer_id_2"}
stream._update_state(stream_slice_second)

# Assert the state after the second call
expected_state_second_call = {
"change_status": {
"customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"},
"customer_id_2": {"change_status.last_change_date_time": "2023-10-21 00:00:00.000000"},
}
}
assert stream._state == expected_state_second_call

# Set pendulum to return a consistent value
now = pendulum.datetime(2023, 11, 2, 12, 53, 7)
pendulum.set_test_now(now)

# Call the _update_state method with the third stream_slice
stream_slice_third = {"customer_id": "customer_id_3"}
stream._update_state(stream_slice_third)

# Assert the state after the third call
expected_state_third_call = {
"change_status": {
"customer_id_1": {"change_status.last_change_date_time": "2023-10-20 00:00:00.000000"},
"customer_id_2": {"change_status.last_change_date_time": "2023-10-21 00:00:00.000000"},
"customer_id_3": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"},
}
}
assert stream._state == expected_state_third_call

# Reset the pendulum mock to its original state
pendulum.set_test_now()


def test_update_state_without_parent_state(mocker):
"""
Test the _update_state method when the parent_stream does not have a state.
"""
# Reset any previous mock state for pendulum
pendulum.set_test_now()

# Mock instance setup
stream = CampaignCriterion(api=MagicMock(), customers=[])

# Mock pendulum call to return a consistent value
now = pendulum.datetime(2023, 11, 2, 12, 53, 7)
pendulum.set_test_now(now)

# Call the _update_state method with the first stream_slice
stream_slice_first = {"customer_id": "customer_id_1"}
stream._update_state(stream_slice_first)

# Assert the state after the first call
expected_state_first_call = {"change_status": {"customer_id_1": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"}}}
assert stream._state == expected_state_first_call

# Call the _update_state method with the second stream_slice
stream_slice_second = {"customer_id": "customer_id_2"}
stream._update_state(stream_slice_second)

# Assert the state after the second call
expected_state_second_call = {
"change_status": {
"customer_id_1": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"},
"customer_id_2": {"change_status.last_change_date_time": "2023-11-02 00:00:00.000000"},
}
}
assert stream._state == expected_state_second_call

# Reset the pendulum mock to its original state
pendulum.set_test_now()
3 changes: 2 additions & 1 deletion docs/integrations/sources/google-ads.md
Expand Up @@ -278,7 +278,8 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `2.0.2` | 2023-10-31 | [32001](https://github.com/airbytehq/airbyte/pull/32001) | Added handling (retry) for `InternalServerError` while reading the streams |
| `2.0.3` | 2023-11-02 | [32102](https://github.com/airbytehq/airbyte/pull/32102) | Fix incremental events streams |
| `2.0.2` | 2023-10-31 | [32001](https://github.com/airbytehq/airbyte/pull/32001) | Added handling (retry) for `InternalServerError` while reading the streams |
| `2.0.1` | 2023-10-27 | [31908](https://github.com/airbytehq/airbyte/pull/31908) | Base image migration: remove Dockerfile and use the python-connector-base image |
| `2.0.0` | 2023-10-04 | [31048](https://github.com/airbytehq/airbyte/pull/31048) | Fix schem default streams, change names of streams. |
| `1.0.0` | 2023-09-28 | [30705](https://github.com/airbytehq/airbyte/pull/30705) | Fix schemas for custom queries |
Expand Down

0 comments on commit bc500ee

Please sign in to comment.