Skip to content

Commit

Permalink
🐛 Source Google Analytics 4 (GA4): improve rate limits messages and r…
Browse files Browse the repository at this point in the history
…etry logic (airbytehq#23822)
  • Loading branch information
bazarnov authored and adriennevermorel committed Mar 17, 2023
1 parent c4a423e commit ee496db
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -754,11 +754,16 @@
- name: Google Analytics 4 (GA4)
sourceDefinitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
dockerRepository: airbyte/source-google-analytics-data-api
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-v4
icon: google-analytics.svg
sourceType: api
releaseStage: beta
allowedHosts:
hosts:
- oauth2.googleapis.com
- www.googleapis.com
- analyticsdata.googleapis.com
- name: Google Directory
sourceDefinitionId: d19ae824-e289-4b14-995a-0632eb46d246
dockerRepository: airbyte/source-google-directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5739,7 +5739,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-google-analytics-data-api:0.1.1"
- dockerImage: "airbyte/source-google-analytics-data-api:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-analytics-v4"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-google-analytics-data-api
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
pip install '.[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:
Expand All @@ -99,7 +99,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Connector A
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-google-analytics-data-api:dev \
&& python -m pytest -p connector_acceptance_test.plugin
```
To run your integration tests with docker

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ acceptance_tests:
spec:
tests:
- spec_path: "source_google_analytics_data_api/spec.json"
backward_compatibility_tests_config:
disable_for_version: "0.0.3"
connection:
tests:
- config_path: "secrets/config.json"
Expand All @@ -16,8 +14,6 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.0.2"
basic_read:
tests:
- config_path: "secrets/config.json"
Expand All @@ -27,14 +23,30 @@ acceptance_tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
"daily_active_users": ["uuid"]
"weekly_active_users": ["uuid"]
"four_weekly_active_users": ["uuid"]
"devices": ["uuid"]
"locations": ["uuid"]
"pages": ["uuid"]
"traffic_sources": ["uuid"]
"website_overview": ["uuid"]
daily_active_users:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
weekly_active_users:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
four_weekly_active_users:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
devices:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
locations:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
pages:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
traffic_sources:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
website_overview:
- name: "uuid"
bypass_reason: "property is changing from sync to sync"
incremental:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.16", "PyJWT==2.4.0", "cryptography==37.0.4", "requests==2.28.1"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "PyJWT==2.4.0", "cryptography==37.0.4", "requests"]

TEST_REQUIREMENTS = [
"freezegun",
"pytest~=6.1",
"pytest-mock~=3.6.1",
"requests-mock~=1.9",
"requests-mock",
"connector-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import logging
from functools import wraps
from typing import Any, Iterable, Mapping, Optional

import requests


class GoogleAnalyticsApiQuotaBase:
# Airbyte Logger
logger = logging.getLogger("airbyte")
# initial quota placeholder
initial_quota: Optional[Mapping[str, Any]] = None
# the % value cutoff, crossing which will trigger
# setting the scenario values for attrs prior to the 429 error
treshold: float = 0.1
# base attrs
should_retry: Optional[bool] = True
backoff_time: Optional[int] = None
raise_on_http_errors: bool = True
# stop making new slices globaly
stop_iter: bool = False
# mapping with scenarios for each quota kind
quota_mapping: Mapping[str, Any] = {
"concurrentRequests": {
"error_pattern": "Exhausted concurrent requests quota.",
"backoff": 30,
"should_retry": True,
"raise_on_http_errors": False,
"stop_iter": False,
},
"tokensPerProjectPerHour": {
"error_pattern": "Exhausted property tokens for a project per hour.",
"backoff": 1800,
"should_retry": True,
"raise_on_http_errors": False,
"stop_iter": False,
},
# TODO: The next scenarious are commented out for now.
# When we face with one of these at least 1 time,
# we should be able to uncomment the one matches the criteria
# and fill-in the `error_pattern` to track that quota as well.
# IMPORTANT: PLEASE DO NOT REMOVE the scenario down bellow!
# 'tokensPerDay': {
# 'error_pattern': "___",
# "backoff": None,
# "should_retry": False,
# "raise_on_http_errors": False,
# "stop_iter": True,
# },
# 'tokensPerHour': {
# 'error_pattern': "___",
# "backoff": 1800,
# "should_retry": True,
# "raise_on_http_errors": False,
# "stop_iter": False,
# },
# 'serverErrorsPerProjectPerHour': {
# 'error_pattern': "___",
# "backoff": 3600,
# "should_retry": True,
# "raise_on_http_errors": False,
# "stop_iter": False,
# },
# 'potentiallyThresholdedRequestsPerHour': {
# 'error_pattern': "___",
# "backoff": 1800,
# "should_retry": True,
# "raise_on_http_errors": False,
# "stop_iter": False,
# },
}

def _get_known_quota_list(self) -> Iterable[str]:
return self.quota_mapping.keys()

def _get_initial_quota_value(self, quota_name: str) -> int:
init_remaining = self.initial_quota.get(quota_name).get("remaining")
# before the 429 is hit the `remaining` could become -1 or 0
return 1 if init_remaining <= 0 else init_remaining

def _get_quota_name_from_error_message(self, error_msg: str) -> Optional[str]:
for quota, value in self.quota_mapping.items():
if value.get("error_pattern") in error_msg:
return quota
return None

def _get_known_quota_from_response(self, property_quota: Mapping[str, Any]) -> Mapping[str, Any]:
current_quota = {}
for quota in property_quota.keys():
if quota in self._get_known_quota_list():
current_quota.update(**{quota: property_quota.get(quota)})
return current_quota

def _set_retry_attrs_for_quota(self, quota_name: str) -> None:
quota = self.quota_mapping.get(quota_name, {})
if quota:
self.should_retry = quota.get("should_retry")
self.raise_on_http_errors = quota.get("raise_on_http_errors")
self.stop_iter = quota.get("stop_iter")
self.backoff_time = quota.get("backoff")

def _set_default_retry_attrs(self) -> None:
self.should_retry = True
self.backoff_time = None
self.raise_on_http_errors = True
self.stop_iter = False

def _set_initial_quota(self, current_quota: Optional[Mapping[str, Any]] = None) -> None:
if not self.initial_quota:
self.initial_quota = current_quota

def _check_remaining_quota(self, current_quota: Mapping[str, Any]) -> None:
for quota_name, quota_value in current_quota.items():
total_available = self._get_initial_quota_value(quota_name)
remaining: int = quota_value.get("remaining")
remaining_percent: float = remaining / total_available
# make an early stop if we faced with the quota that is going to run out
if remaining_percent <= self.treshold:
self.logger.warn(f"The `{quota_name}` quota is running out of tokens. Available {remaining} out of {total_available}.")
self._set_retry_attrs_for_quota(quota_name)
return None

def _check_for_errors(self, response: requests.Response) -> None:
try:
# revert to default values after successul retry
self._set_default_retry_attrs()
error = response.json().get("error")
if error:
quota_name = self._get_quota_name_from_error_message(error.get("message"))
if quota_name:
self._set_retry_attrs_for_quota(quota_name)
self.logger.warn(f"The `{quota_name}` quota is exceeded!")
return None
except AttributeError as attr_e:
self.logger.warn(
f"`GoogleAnalyticsApiQuota._check_for_errors`: Received non JSON response from the API. Full error: {attr_e}. Bypassing."
)
pass
except Exception as e:
self.logger.fatal(f"Other `GoogleAnalyticsApiQuota` error: {e}")
raise


class GoogleAnalyticsApiQuota(GoogleAnalyticsApiQuotaBase):
def _check_quota(self, response: requests.Response):
# try get json from response
try:
parsed_response = response.json()
except AttributeError as e:
self.logger.warn(
f"`GoogleAnalyticsApiQuota._check_quota`: Received non JSON response from the API. Full error: {e}. Bypassing."
)
parsed_response = {}
# get current quota
property_quota: dict = parsed_response.get("propertyQuota")
if property_quota:
# return default attrs values once successfully retried
# or until another 429 error is hit
self._set_default_retry_attrs()
# reduce quota list to known kinds only
current_quota = self._get_known_quota_from_response(property_quota)
if current_quota:
# save the initial quota
self._set_initial_quota(current_quota)
# check for remaining quota
self._check_remaining_quota(current_quota)
else:
self._check_for_errors(response)

def handle_quota(self) -> None:
"""
The function decorator is used to integrate with the `should_retry` method,
or any other method that provides early access to the `response` object.
"""

def decorator(func):
@wraps(func)
def wrapper_handle_quota(*args, **kwargs):
# find the requests.Response inside args list
for arg in args:
response = arg if isinstance(arg, requests.models.Response) else None
# check for the quota
self._check_quota(response)
# return actual function
return func(*args, **kwargs)

return wrapper_handle_quota

return decorator
Loading

0 comments on commit ee496db

Please sign in to comment.