Skip to content

Commit

Permalink
Source Google Ads: make streams incremental (#10315)
Browse files Browse the repository at this point in the history
* make streams as incremental

* refactor Source

* fix typing

* fix primary_key

* fix primary key in configured_catalog

* bump the version

* fix primary key in catalog

* updated spec and def yaml

Co-authored-by: auganbay <auganenu@gmail.com>
  • Loading branch information
augan-rymkhan and Augan93 committed Feb 16, 2022
1 parent 828ebc0 commit 741e672
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 43 deletions.
Expand Up @@ -252,7 +252,7 @@
- name: Google Ads
sourceDefinitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerRepository: airbyte/source-google-ads
dockerImageTag: 0.1.26
dockerImageTag: 0.1.27
documentationUrl: https://docs.airbyte.io/integrations/sources/google-ads
icon: google-adwords.svg
sourceType: api
Expand Down
Expand Up @@ -2306,7 +2306,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-google-ads:0.1.26"
- dockerImage: "airbyte/source-google-ads:0.1.27"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-ads"
connectionSpecification:
Expand Down
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.26
LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.name=airbyte/source-google-ads
Expand Up @@ -30,6 +30,7 @@
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"source_defined_primary_key": [["click_view.gclid"], ["segments.date"]],
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
Expand Down Expand Up @@ -100,41 +101,53 @@
"stream": {
"name": "ad_group_ads",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["ad_group_ad.ad.id"]]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"source_defined_primary_key": [["ad_group_ad.ad.id"], ["segments.date"]],
"default_cursor_field": ["segments.date"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "ad_groups",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["ad_group.id"]]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"],
"source_defined_primary_key": [["ad_group.id"], ["segments.date"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "accounts",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["customer.id"]]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"],
"source_defined_primary_key": [["customer.id"], ["segments.date"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "campaigns",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["campaign.id"]]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"],
"source_defined_primary_key": [["campaign.id"], ["segments.date"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
Expand Down
Expand Up @@ -64,6 +64,10 @@
},
"customer.tracking_url_template": {
"type": ["null", "string"]
},
"segments.date": {
"type": ["null", "string"],
"format": "date"
}
}
}
Expand Up @@ -565,6 +565,10 @@
},
"ad_group_ad.status": {
"type": ["null", "string"]
},
"segments.date": {
"type": ["null", "string"],
"format": "date"
}
}
}
Expand Up @@ -94,6 +94,10 @@
"items": {
"type": "string"
}
},
"segments.date": {
"type": ["null", "string"],
"format": "date"
}
}
}
Expand Up @@ -238,6 +238,10 @@
},
"campaign.video_brand_safety_suitability": {
"type": ["null", "string"]
},
"segments.date": {
"type": ["null", "string"],
"format": "date"
}
}
}
Expand Up @@ -46,8 +46,19 @@ def get_credentials(config: Mapping[str, Any]) -> Mapping[str, Any]:
return credentials

@staticmethod
def get_account_info(google_api) -> dict:
accounts_streams = Accounts(api=google_api)
def get_incremental_stream_config(google_api: GoogleAds, config: Mapping[str, Any], tz: Union[timezone, str] = "local"):
incremental_stream_config = dict(
api=google_api,
conversion_window_days=config["conversion_window_days"],
start_date=config["start_date"],
time_zone=tz,
end_date=config.get("end_date"),
)
return incremental_stream_config

def get_account_info(self, google_api: GoogleAds, config: Mapping[str, Any]) -> dict:
incremental_stream_config = self.get_incremental_stream_config(google_api, config)
accounts_streams = Accounts(**incremental_stream_config)
for stream_slice in accounts_streams.stream_slices(sync_mode=SyncMode.full_refresh):
return next(accounts_streams.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), {})

Expand All @@ -74,7 +85,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
try:
logger.info("Checking the config")
google_api = GoogleAds(credentials=self.get_credentials(config), customer_id=config["customer_id"])
account_info = self.get_account_info(google_api)
account_info = self.get_account_info(google_api, config)
is_manager_account = self.is_manager_account(account_info)

# Check custom query request validity by sending metric request with non-existant time window
Expand All @@ -95,22 +106,15 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
google_api = GoogleAds(credentials=self.get_credentials(config), customer_id=config["customer_id"])
account_info = self.get_account_info(google_api)
account_info = self.get_account_info(google_api, config)
time_zone = self.get_time_zone(account_info)
end_date = config.get("end_date")
incremental_stream_config = dict(
api=google_api,
conversion_window_days=config["conversion_window_days"],
start_date=config["start_date"],
time_zone=time_zone,
end_date=end_date,
)
incremental_stream_config = self.get_incremental_stream_config(google_api, config, tz=time_zone)

streams = [
AdGroupAds(api=google_api),
AdGroups(api=google_api),
Accounts(api=google_api),
Campaigns(api=google_api),
AdGroupAds(**incremental_stream_config),
AdGroups(**incremental_stream_config),
Accounts(**incremental_stream_config),
Campaigns(**incremental_stream_config),
ClickView(**incremental_stream_config),
]

Expand Down
Expand Up @@ -215,36 +215,36 @@ def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
return query


class Accounts(GoogleAdsStream):
class Accounts(IncrementalGoogleAdsStream):
"""
Accounts stream: https://developers.google.com/google-ads/api/fields/v8/customer
"""

primary_key = "customer.id"
primary_key = ["customer.id", "segments.date"]


class Campaigns(GoogleAdsStream):
class Campaigns(IncrementalGoogleAdsStream):
"""
Campaigns stream: https://developers.google.com/google-ads/api/fields/v8/campaign
"""

primary_key = "campaign.id"
primary_key = ["campaign.id", "segments.date"]


class AdGroups(GoogleAdsStream):
class AdGroups(IncrementalGoogleAdsStream):
"""
AdGroups stream: https://developers.google.com/google-ads/api/fields/v8/ad_group
"""

primary_key = "ad_group.id"
primary_key = ["ad_group.id", "segments.date"]


class AdGroupAds(GoogleAdsStream):
class AdGroupAds(IncrementalGoogleAdsStream):
"""
AdGroups stream: https://developers.google.com/google-ads/api/fields/v8/ad_group_ad
"""

primary_key = "ad_group_ad.ad.id"
primary_key = ["ad_group_ad.ad.id", "segments.date"]


class AccountPerformanceReport(IncrementalGoogleAdsStream):
Expand Down Expand Up @@ -306,5 +306,6 @@ class ClickView(IncrementalGoogleAdsStream):
ClickView stream: https://developers.google.com/google-ads/api/reference/rpc/v8/ClickView
"""

primary_key = ["click_view.gclid", "segments.date"]
days_of_data_storage = 90
range_days = 1
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Expand Up @@ -102,6 +102,7 @@ This source is constrained by whatever API limits are set for the Google Ads tha

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| `0.1.27` | 2022-02-16 | [10315](https://github.com/airbytehq/airbyte/pull/10315) | Make `ad_group_ads` and other streams support incremental sync. |
| `0.1.26` | 2022-02-11 | [10150](https://github.com/airbytehq/airbyte/pull/10150) | Add support for multiple customer IDs. |
| `0.1.25` | 2022-02-04 | [9812](https://github.com/airbytehq/airbyte/pull/9812) | Handle `EXPIRED_PAGE_TOKEN` exception and retry with updated state. |
| `0.1.24` | 2022-02-04 | [9996](https://github.com/airbytehq/airbyte/pull/9996) | Use Google Ads API version V9. |
Expand Down

0 comments on commit 741e672

Please sign in to comment.