Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Google Ads: Fix error for new customers for incremental events streams #35664

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerImageTag: 3.3.5
dockerImageTag: 3.3.6
dockerRepository: airbyte/source-google-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
githubIssueLabel: source-google-ads
Expand Down
183 changes: 91 additions & 92 deletions airbyte-integrations/connectors/source-google-ads/poetry.lock

Large diffs are not rendered by default.

Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.3.5"
version = "3.3.6"
name = "source-google-ads"
description = "Source implementation for Google Ads."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Expand Up @@ -556,6 +556,37 @@ def query_limit(self) -> Optional[int]:
"""Queries for ChangeStatus resource have to include limit in it"""
return 10000

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
"""Modifies the original stream_slices to return one empty slice for new customers that doesn't have state yet"""
stream_state = stream_state or {}
for customer in self.customers:
if stream_state.get(customer.id):
start_date = stream_state[customer.id].get(self.cursor_field) or self._start_date
# We should keep backward compatibility with the previous version
elif stream_state.get(self.cursor_field) and len(self.customers) == 1:
start_date = stream_state.get(self.cursor_field) or self._start_date
else:
# child stream doesn't need parent stream as it is used only for the updates
yield {"customer_id": customer.id, "login_customer_id": customer.login_customer_id}
continue

end_date = self._end_date

for chunk in chunk_date_range(
start_date=start_date,
end_date=end_date,
conversion_window=self.conversion_window_days,
days_of_data_storage=self.days_of_data_storage,
time_zone=customer.time_zone,
time_format=self.cursor_time_format,
slice_duration=self.slice_duration,
slice_step=self.slice_step,
):
if chunk:
chunk["customer_id"] = customer.id
chunk["login_customer_id"] = customer.login_customer_id
yield chunk

def read_records(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: MutableMapping[str, Any] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
Expand Down Expand Up @@ -658,39 +689,11 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
"""
If state exists read updates from parent stream otherwise return slices with only customer id to sync all records for stream
"""
if stream_state:
slices_generator = self.read_parent_stream(SyncMode.incremental, self.parent_cursor_field, stream_state)
yield from slices_generator
else:
for customer in self.customers:
yield {
"customer_id": customer.id,
"login_customer_id": customer.login_customer_id,
"updated_ids": set(),
"deleted_ids": set(),
"record_changed_time_map": dict(),
}

def _process_parent_record(self, parent_record: MutableMapping[str, Any], child_slice: MutableMapping[str, Any]) -> bool:
"""Process a single parent_record and update the child_slice."""
substream_id = parent_record.get(self.parent_id_field)
if not substream_id:
return False

# Save time of change
child_slice["record_changed_time_map"][substream_id] = parent_record[self.parent_cursor_field]

# Add record id to list of changed or deleted items depending on status
slice_id_list = "deleted_ids" if parent_record.get("change_status.resource_status") == "REMOVED" else "updated_ids"
child_slice[slice_id_list].add(substream_id)

return True

def read_parent_stream(
self, sync_mode: SyncMode, cursor_field: Optional[str], stream_state: Mapping[str, Any]
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
for parent_slice in self.parent_stream.stream_slices(
sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state.get(self.parent_stream_name)
sync_mode=SyncMode.incremental,
cursor_field=self.parent_cursor_field,
stream_state=stream_state.get(self.parent_stream_name, {}),
):
customer_id = parent_slice.get("customer_id")
child_slice = {
Expand All @@ -705,13 +708,30 @@ def read_parent_stream(
continue

parent_slice["resource_type"] = self.resource_type
for parent_record in self.parent_stream.read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=parent_slice):
for parent_record in self.parent_stream.read_records(
sync_mode=SyncMode.incremental, cursor_field=self.parent_cursor_field, stream_slice=parent_slice
):
self._process_parent_record(parent_record, child_slice)

# yield child slice if any records where read
if child_slice["record_changed_time_map"]:
yield child_slice

def _process_parent_record(self, parent_record: MutableMapping[str, Any], child_slice: MutableMapping[str, Any]) -> bool:
"""Process a single parent_record and update the child_slice."""
substream_id = parent_record.get(self.parent_id_field)
if not substream_id:
return False

# Save time of change
child_slice["record_changed_time_map"][substream_id] = parent_record[self.parent_cursor_field]

# Add record id to list of changed or deleted items depending on status
slice_id_list = "deleted_ids" if parent_record.get("change_status.resource_status") == "REMOVED" else "updated_ids"
child_slice[slice_id_list].add(substream_id)

return True

def parse_response(self, response: SearchPager, stream_slice: MutableMapping[str, Any] = None) -> Iterable[Mapping]:
# update records with time obtained from parent stream
for record in super().parse_response(response):
Expand Down
Expand Up @@ -55,6 +55,10 @@ def mock_oauth_call(requests_mock):
def customers(config):
return [CustomerModel(id=_id, time_zone="local", is_manager_account=False) for _id in config["customer_id"].split(",")]

@pytest.fixture
def additional_customers(config, customers):
return customers + [CustomerModel(id="789", time_zone="local", is_manager_account=False)]


@pytest.fixture
def customers_manager(config):
Expand Down
Expand Up @@ -81,6 +81,54 @@ def test_change_status_stream(config, customers):
stream.get_query.assert_called_with({"customer_id": customer_id, "login_customer_id": "default"})


def test_change_status_stream_slices(config, additional_customers):
""" Change status stream slices should return correct empty slices for the new customers """
google_api = MockGoogleAds(credentials=config["credentials"])

stream = ChangeStatus(api=google_api, customers=additional_customers)

now = pendulum.datetime(2023, 11, 2, 12, 53, 7)
pendulum.set_test_now(now)

stream_state = {"123": {"change_status.last_change_date_time": "2023-11-01 12:36:04.772447"}}

result_slices = list(stream.stream_slices(stream_state=stream_state))
assert len(result_slices) == 2
assert result_slices == [{'start_date': '2023-11-01 12:36:04.772447', 'end_date': '2023-11-02 00:00:00.000000', 'customer_id': '123',
'login_customer_id': None}, {'customer_id': '789', 'login_customer_id': None}]


def test_incremental_events_stream_slices(config, additional_customers):
""" Test if the empty slice will be produced for the new customers """
stream_state = {"change_status": {"123": {"change_status.last_change_date_time": "2023-06-12 13:20:01.003295"}}}

google_api = MockGoogleAds(credentials=config["credentials"])

stream = CampaignCriterion(api=google_api, customers=additional_customers)
parent_stream = stream.parent_stream

parent_stream.get_query = Mock()
parent_stream.get_query.return_value = "query_parent"

parent_stream.state = stream_state["change_status"]

stream.get_query = Mock()
stream.get_query.return_value = "query_child"

now = pendulum.datetime(2023, 6, 15, 12, 53, 7)
pendulum.set_test_now(now)

stream_slices = list(stream.stream_slices(stream_state=stream_state))

assert len(stream_slices) == 2
assert stream_slices == [{'customer_id': '123', 'updated_ids': {'2', '1'}, 'deleted_ids': {'3', '4'},
'record_changed_time_map': {'1': '2023-06-13 12:36:01.772447', '2': '2023-06-13 12:36:02.772447',
'3': '2023-06-13 12:36:03.772447', '4': '2023-06-13 12:36:04.772447'},
'login_customer_id': None},
{'customer_id': '789', 'updated_ids': set(), 'deleted_ids': set(), 'record_changed_time_map': {},
'login_customer_id': None}]


def test_child_incremental_events_read(config, customers):
"""
Page token expired while reading records on date 2021-01-03
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Expand Up @@ -280,6 +280,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `3.3.6` | 2024-03-01 | [35664](https://github.com/airbytehq/airbyte/pull/35664) | Fix error for new customers for incremental events streams |
| `3.3.5` | 2024-02-28 | [35709](https://github.com/airbytehq/airbyte/pull/35709) | Handle 2-Step Verification exception as config error |
| `3.3.4` | 2024-02-21 | [35493](https://github.com/airbytehq/airbyte/pull/35493) | Rolling back the patch 3.3.3 made for `user_interest` steam |
| `3.3.3` | 2024-02-14 | [35280](https://github.com/airbytehq/airbyte/pull/35280) | Temporary patch that disables some fields to avoid 500 error when syncing `user_interest` steam |
Expand Down