diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 18545cd4e5c7e..0f1f459ef1252 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -754,11 +754,16 @@ - name: Google Analytics 4 (GA4) sourceDefinitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a dockerRepository: airbyte/source-google-analytics-data-api - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-v4 icon: google-analytics.svg sourceType: api releaseStage: beta + allowedHosts: + hosts: + - oauth2.googleapis.com + - www.googleapis.com + - analyticsdata.googleapis.com - name: Google Directory sourceDefinitionId: d19ae824-e289-4b14-995a-0632eb46d246 dockerRepository: airbyte/source-google-directory diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index cff20084964a0..5e5140406c509 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5739,7 +5739,7 @@ oauthFlowOutputParameters: - - "access_token" - - "refresh_token" -- dockerImage: "airbyte/source-google-analytics-data-api:0.1.1" +- dockerImage: "airbyte/source-google-analytics-data-api:0.1.2" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/google-analytics-v4" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile index cd86dac643593..6df850eec5906 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-google-analytics-data-api diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/README.md b/airbyte-integrations/connectors/source-google-analytics-data-api/README.md index 7ff91a822ed53..87677dfc98e00 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/README.md +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/README.md @@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. First install test dependencies into your virtual environment: ``` -pip install .[tests] +pip install '.[tests]' ``` ### Unit Tests To run unit tests locally, from the connector directory run: @@ -99,7 +99,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Connector A If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. To run your integration tests with acceptance tests, from the connector root, run ``` -python -m pytest integration_tests -p integration_tests.acceptance +docker build . --no-cache -t airbyte/source-google-analytics-data-api:dev \ +&& python -m pytest -p connector_acceptance_test.plugin ``` To run your integration tests with docker diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml index 6d0227cd95afb..344b83653ebde 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml @@ -5,8 +5,6 @@ acceptance_tests: spec: tests: - spec_path: "source_google_analytics_data_api/spec.json" - backward_compatibility_tests_config: - disable_for_version: "0.0.3" connection: tests: - config_path: "secrets/config.json" @@ -16,8 +14,6 @@ acceptance_tests: discovery: tests: - config_path: "secrets/config.json" - backward_compatibility_tests_config: - disable_for_version: "0.0.2" basic_read: tests: - config_path: "secrets/config.json" @@ -27,14 +23,30 @@ acceptance_tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" ignored_fields: - "daily_active_users": ["uuid"] - "weekly_active_users": ["uuid"] - "four_weekly_active_users": ["uuid"] - "devices": ["uuid"] - "locations": ["uuid"] - "pages": ["uuid"] - "traffic_sources": ["uuid"] - "website_overview": ["uuid"] + daily_active_users: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + weekly_active_users: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + four_weekly_active_users: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + devices: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + locations: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + pages: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + traffic_sources: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" + website_overview: + - name: "uuid" + bypass_reason: "property is changing from sync to sync" incremental: tests: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py b/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py index 033bc4bc9e2b7..1b42a7de0691b 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/setup.py @@ -5,13 +5,13 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.16", "PyJWT==2.4.0", "cryptography==37.0.4", "requests==2.28.1"] +MAIN_REQUIREMENTS = ["airbyte-cdk", "PyJWT==2.4.0", "cryptography==37.0.4", "requests"] TEST_REQUIREMENTS = [ "freezegun", "pytest~=6.1", "pytest-mock~=3.6.1", - "requests-mock~=1.9", + "requests-mock", "connector-acceptance-test", ] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/api_quota.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/api_quota.py new file mode 100644 index 0000000000000..7b0aef7e50927 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/api_quota.py @@ -0,0 +1,194 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import logging +from functools import wraps +from typing import Any, Iterable, Mapping, Optional + +import requests + + +class GoogleAnalyticsApiQuotaBase: + # Airbyte Logger + logger = logging.getLogger("airbyte") + # initial quota placeholder + initial_quota: Optional[Mapping[str, Any]] = None + # the % value cutoff, crossing which will trigger + # setting the scenario values for attrs prior to the 429 error + treshold: float = 0.1 + # base attrs + should_retry: Optional[bool] = True + backoff_time: Optional[int] = None + raise_on_http_errors: bool = True + # stop making new slices globaly + stop_iter: bool = False + # mapping with scenarios for each quota kind + quota_mapping: Mapping[str, Any] = { + "concurrentRequests": { + "error_pattern": "Exhausted concurrent requests quota.", + "backoff": 30, + "should_retry": True, + "raise_on_http_errors": False, + "stop_iter": False, + }, + "tokensPerProjectPerHour": { + "error_pattern": "Exhausted property tokens for a project per hour.", + "backoff": 1800, + "should_retry": True, + "raise_on_http_errors": False, + "stop_iter": False, + }, + # TODO: The next scenarious are commented out for now. + # When we face with one of these at least 1 time, + # we should be able to uncomment the one matches the criteria + # and fill-in the `error_pattern` to track that quota as well. + # IMPORTANT: PLEASE DO NOT REMOVE the scenario down bellow! + # 'tokensPerDay': { + # 'error_pattern': "___", + # "backoff": None, + # "should_retry": False, + # "raise_on_http_errors": False, + # "stop_iter": True, + # }, + # 'tokensPerHour': { + # 'error_pattern': "___", + # "backoff": 1800, + # "should_retry": True, + # "raise_on_http_errors": False, + # "stop_iter": False, + # }, + # 'serverErrorsPerProjectPerHour': { + # 'error_pattern': "___", + # "backoff": 3600, + # "should_retry": True, + # "raise_on_http_errors": False, + # "stop_iter": False, + # }, + # 'potentiallyThresholdedRequestsPerHour': { + # 'error_pattern': "___", + # "backoff": 1800, + # "should_retry": True, + # "raise_on_http_errors": False, + # "stop_iter": False, + # }, + } + + def _get_known_quota_list(self) -> Iterable[str]: + return self.quota_mapping.keys() + + def _get_initial_quota_value(self, quota_name: str) -> int: + init_remaining = self.initial_quota.get(quota_name).get("remaining") + # before the 429 is hit the `remaining` could become -1 or 0 + return 1 if init_remaining <= 0 else init_remaining + + def _get_quota_name_from_error_message(self, error_msg: str) -> Optional[str]: + for quota, value in self.quota_mapping.items(): + if value.get("error_pattern") in error_msg: + return quota + return None + + def _get_known_quota_from_response(self, property_quota: Mapping[str, Any]) -> Mapping[str, Any]: + current_quota = {} + for quota in property_quota.keys(): + if quota in self._get_known_quota_list(): + current_quota.update(**{quota: property_quota.get(quota)}) + return current_quota + + def _set_retry_attrs_for_quota(self, quota_name: str) -> None: + quota = self.quota_mapping.get(quota_name, {}) + if quota: + self.should_retry = quota.get("should_retry") + self.raise_on_http_errors = quota.get("raise_on_http_errors") + self.stop_iter = quota.get("stop_iter") + self.backoff_time = quota.get("backoff") + + def _set_default_retry_attrs(self) -> None: + self.should_retry = True + self.backoff_time = None + self.raise_on_http_errors = True + self.stop_iter = False + + def _set_initial_quota(self, current_quota: Optional[Mapping[str, Any]] = None) -> None: + if not self.initial_quota: + self.initial_quota = current_quota + + def _check_remaining_quota(self, current_quota: Mapping[str, Any]) -> None: + for quota_name, quota_value in current_quota.items(): + total_available = self._get_initial_quota_value(quota_name) + remaining: int = quota_value.get("remaining") + remaining_percent: float = remaining / total_available + # make an early stop if we faced with the quota that is going to run out + if remaining_percent <= self.treshold: + self.logger.warn(f"The `{quota_name}` quota is running out of tokens. Available {remaining} out of {total_available}.") + self._set_retry_attrs_for_quota(quota_name) + return None + + def _check_for_errors(self, response: requests.Response) -> None: + try: + # revert to default values after successul retry + self._set_default_retry_attrs() + error = response.json().get("error") + if error: + quota_name = self._get_quota_name_from_error_message(error.get("message")) + if quota_name: + self._set_retry_attrs_for_quota(quota_name) + self.logger.warn(f"The `{quota_name}` quota is exceeded!") + return None + except AttributeError as attr_e: + self.logger.warn( + f"`GoogleAnalyticsApiQuota._check_for_errors`: Received non JSON response from the API. Full error: {attr_e}. Bypassing." + ) + pass + except Exception as e: + self.logger.fatal(f"Other `GoogleAnalyticsApiQuota` error: {e}") + raise + + +class GoogleAnalyticsApiQuota(GoogleAnalyticsApiQuotaBase): + def _check_quota(self, response: requests.Response): + # try get json from response + try: + parsed_response = response.json() + except AttributeError as e: + self.logger.warn( + f"`GoogleAnalyticsApiQuota._check_quota`: Received non JSON response from the API. Full error: {e}. Bypassing." + ) + parsed_response = {} + # get current quota + property_quota: dict = parsed_response.get("propertyQuota") + if property_quota: + # return default attrs values once successfully retried + # or until another 429 error is hit + self._set_default_retry_attrs() + # reduce quota list to known kinds only + current_quota = self._get_known_quota_from_response(property_quota) + if current_quota: + # save the initial quota + self._set_initial_quota(current_quota) + # check for remaining quota + self._check_remaining_quota(current_quota) + else: + self._check_for_errors(response) + + def handle_quota(self) -> None: + """ + The function decorator is used to integrate with the `should_retry` method, + or any other method that provides early access to the `response` object. + """ + + def decorator(func): + @wraps(func) + def wrapper_handle_quota(*args, **kwargs): + # find the requests.Response inside args list + for arg in args: + response = arg if isinstance(arg, requests.models.Response) else None + # check for the quota + self._check_quota(response) + # return actual function + return func(*args, **kwargs) + + return wrapper_handle_quota + + return decorator diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py index 6b839a8861425..b15e156143bb5 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py @@ -15,70 +15,16 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http import HttpStream, auth +from airbyte_cdk.sources.streams.http import HttpStream from source_google_analytics_data_api import utils -from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator from source_google_analytics_data_api.utils import DATE_FORMAT -metrics_data_types_map: Dict = { - "METRIC_TYPE_UNSPECIFIED": "string", - "TYPE_INTEGER": "integer", - "TYPE_FLOAT": "number", - "TYPE_SECONDS": "number", - "TYPE_MILLISECONDS": "number", - "TYPE_MINUTES": "number", - "TYPE_HOURS": "number", - "TYPE_STANDARD": "number", - "TYPE_CURRENCY": "number", - "TYPE_FEET": "number", - "TYPE_MILES": "number", - "TYPE_METERS": "number", - "TYPE_KILOMETERS": "number", -} - - -def get_metrics_type(t: str) -> str: - return metrics_data_types_map.get(t, "number") - - -metrics_data_native_types_map: Dict = { - "METRIC_TYPE_UNSPECIFIED": str, - "TYPE_INTEGER": int, - "TYPE_FLOAT": float, - "TYPE_SECONDS": float, - "TYPE_MILLISECONDS": float, - "TYPE_MINUTES": float, - "TYPE_HOURS": float, - "TYPE_STANDARD": float, - "TYPE_CURRENCY": float, - "TYPE_FEET": float, - "TYPE_MILES": float, - "TYPE_METERS": float, - "TYPE_KILOMETERS": float, -} - - -def metrics_type_to_python(t: str) -> type: - return metrics_data_native_types_map.get(t, str) - - -def get_dimensions_type(d: str) -> str: - return "string" - - -authenticator_class_map: Dict = { - "Service": (GoogleServiceKeyAuthenticator, lambda credentials: {"credentials": credentials["credentials_json"]}), - "Client": ( - auth.Oauth2Authenticator, - lambda credentials: { - "token_refresh_endpoint": "https://oauth2.googleapis.com/token", - "scopes": ["https://www.googleapis.com/auth/analytics.readonly"], - "client_secret": credentials["client_secret"], - "client_id": credentials["client_id"], - "refresh_token": credentials["refresh_token"], - }, - ), -} +from .api_quota import GoogleAnalyticsApiQuota +from .utils import authenticator_class_map, get_dimensions_type, get_metrics_type, metrics_type_to_python + +# set the quota handler globaly since limitations are the same for all streams +# the initial values should be saved once and tracked for each stream, inclusivelly. +GoogleAnalyticsQuotaHandler: GoogleAnalyticsApiQuota = GoogleAnalyticsApiQuota() class ConfigurationError(Exception): @@ -96,8 +42,8 @@ def __get__(self, instance, owner): if not metadata: raise Exception("failed to get metadata, over quota, try later") self._metadata = { - "dimensions": {m["apiName"]: m for m in metadata["dimensions"]}, - "metrics": {m["apiName"]: m for m in metadata["metrics"]}, + "dimensions": {m.get("apiName"): m for m in metadata.get("dimensions", [{}])}, + "metrics": {m.get("apiName"): m for m in metadata.get("metrics", [{}])}, } return self._metadata @@ -106,28 +52,32 @@ def __get__(self, instance, owner): class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC): url_base = "https://analyticsdata.googleapis.com/v1beta/" http_method = "POST" + raise_on_http_errors = True def __init__(self, *, config: Mapping[str, Any], **kwargs): super().__init__(**kwargs) self._config = config - self._stop_iteration = False @property def config(self): return self._config + # handle the quota errors with prepared values for: + # `should_retry`, `backoff_time`, `raise_on_http_errors`, `stop_iter` based on quota scenario. + @GoogleAnalyticsQuotaHandler.handle_quota() def should_retry(self, response: requests.Response) -> bool: - if response.status_code == 429: - return False + if response.status_code == requests.codes.too_many_requests: + setattr(self, "raise_on_http_errors", GoogleAnalyticsQuotaHandler.raise_on_http_errors) + return GoogleAnalyticsQuotaHandler.should_retry + # for all other cases not covered by GoogleAnalyticsQuotaHandler return super().should_retry(response) - def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: - try: - yield from super().read_records(**kwargs) - except requests.exceptions.HTTPError as e: - self._stop_iteration = True - if e.response.status_code != 429: - raise e + def backoff_time(self, response: requests.Response) -> Optional[float]: + # handle the error with prepared GoogleAnalyticsQuotaHandler backoff value + if response.status_code == requests.codes.too_many_requests: + return GoogleAnalyticsQuotaHandler.backoff_time + # for all other cases not covered by GoogleAnalyticsQuotaHandler + return super().backoff_time(response) class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream): @@ -221,9 +171,9 @@ def parse_response( ) -> Iterable[Mapping]: r = response.json() - dimensions = [h["name"] for h in r["dimensionHeaders"]] - metrics = [h["name"] for h in r["metricHeaders"]] - metrics_type_map = {h["name"]: h["type"] for h in r["metricHeaders"]} + dimensions = [h.get("name") for h in r.get("dimensionHeaders", [{}])] + metrics = [h.get("name") for h in r.get("metricHeaders", [{}])] + metrics_type_map = {h.get("name"): h.get("type") for h in r.get("metricHeaders", [{}])} for row in r.get("rows", []): yield self.add_primary_key() | self.add_property_id(self.config["property_id"]) | self.add_dimensions( @@ -245,11 +195,13 @@ def request_body_json( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> Optional[Mapping]: - return { + payload = { "metrics": [{"name": m} for m in self.config["metrics"]], "dimensions": [{"name": d} for d in self.config["dimensions"]], "dateRanges": [stream_slice], + "returnPropertyQuota": True, } + return payload def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None @@ -265,14 +217,16 @@ def stream_slices( start_date = self.config["date_ranges_start_date"] while start_date <= today: - if self._stop_iteration: - return - - yield { - "startDate": utils.date_to_string(start_date), - "endDate": utils.date_to_string(min(start_date + datetime.timedelta(days=self.config["window_in_days"] - 1), today)), - } - start_date += datetime.timedelta(days=self.config["window_in_days"]) + # stop producing slices if 429 + specific scenario is hit + # see GoogleAnalyticsQuotaHandler for more info. + if GoogleAnalyticsQuotaHandler.stop_iter: + return [] + else: + yield { + "startDate": utils.date_to_string(start_date), + "endDate": utils.date_to_string(min(start_date + datetime.timedelta(days=self.config["window_in_days"] - 1), today)), + } + start_date += datetime.timedelta(days=self.config["window_in_days"]) class GoogleAnalyticsDataApiMetadataStream(GoogleAnalyticsDataApiAbstractStream): diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py index bde1efe1ac4c1..bce85dd46983b 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py @@ -2,11 +2,62 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + import calendar import datetime +from typing import Dict + +from airbyte_cdk.sources.streams.http import auth +from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator DATE_FORMAT = "%Y-%m-%d" +metrics_data_native_types_map: Dict = { + "METRIC_TYPE_UNSPECIFIED": str, + "TYPE_INTEGER": int, + "TYPE_FLOAT": float, + "TYPE_SECONDS": float, + "TYPE_MILLISECONDS": float, + "TYPE_MINUTES": float, + "TYPE_HOURS": float, + "TYPE_STANDARD": float, + "TYPE_CURRENCY": float, + "TYPE_FEET": float, + "TYPE_MILES": float, + "TYPE_METERS": float, + "TYPE_KILOMETERS": float, +} + +metrics_data_types_map: Dict = { + "METRIC_TYPE_UNSPECIFIED": "string", + "TYPE_INTEGER": "integer", + "TYPE_FLOAT": "number", + "TYPE_SECONDS": "number", + "TYPE_MILLISECONDS": "number", + "TYPE_MINUTES": "number", + "TYPE_HOURS": "number", + "TYPE_STANDARD": "number", + "TYPE_CURRENCY": "number", + "TYPE_FEET": "number", + "TYPE_MILES": "number", + "TYPE_METERS": "number", + "TYPE_KILOMETERS": "number", +} + +authenticator_class_map: Dict = { + "Service": (GoogleServiceKeyAuthenticator, lambda credentials: {"credentials": credentials["credentials_json"]}), + "Client": ( + auth.Oauth2Authenticator, + lambda credentials: { + "token_refresh_endpoint": "https://oauth2.googleapis.com/token", + "scopes": ["https://www.googleapis.com/auth/analytics.readonly"], + "client_secret": credentials["client_secret"], + "client_id": credentials["client_id"], + "refresh_token": credentials["refresh_token"], + }, + ), +} + def datetime_to_secs(dt: datetime.datetime) -> int: return calendar.timegm(dt.utctimetuple()) @@ -25,3 +76,15 @@ def string_to_date(d: str, f: str = DATE_FORMAT, old_format=None) -> datetime.da def date_to_string(d: datetime.date, f: str = DATE_FORMAT) -> str: return d.strftime(f) + + +def get_metrics_type(t: str) -> str: + return metrics_data_types_map.get(t, "number") + + +def metrics_type_to_python(t: str) -> type: + return metrics_data_native_types_map.get(t, str) + + +def get_dimensions_type(d: str) -> str: + return "string" diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_api_quota.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_api_quota.py new file mode 100644 index 0000000000000..1f2210b92e19f --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_api_quota.py @@ -0,0 +1,138 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import pytest +import requests +from source_google_analytics_data_api.api_quota import GoogleAnalyticsApiQuota + +TEST_QUOTA_INSTANCE: GoogleAnalyticsApiQuota = GoogleAnalyticsApiQuota() + + +@pytest.fixture(name='expected_quota_list') +def expected_quota_list(): + """ The Quota were currently handle """ + return ['concurrentRequests', 'tokensPerProjectPerHour'] + + +def test_check_initial_quota_is_empty(): + """ + Check the initial quota property is empty (== None), but ready to be fullfield. + """ + assert not TEST_QUOTA_INSTANCE.initial_quota + + +@pytest.mark.parametrize( + ("response_quota", "partial_quota", "should_retry_exp", "backoff_time_exp", "raise_on_http_errors_exp", "stop_iter_exp"), + [ + ( + { + 'propertyQuota': { + 'concurrentRequests': { + 'consumed': 0, + 'remaining': 10 + }, + 'tokensPerProjectPerHour': { + 'consumed': 1, + 'remaining': 1735 + } + } + }, + False, True, None, True, False, + ), + ( + { + 'propertyQuota': { + 'concurrentRequests': { + 'consumed': 0, + 'remaining': 10 + }, + 'tokensPerProjectPerHour': { + 'consumed': 5, + 'remaining': 955 + } + } + }, + True, True, None, True, False, + ), + ( + { + 'propertyQuota': { + 'concurrentRequests': { + 'consumed': 2, + 'remaining': 8 + }, + 'tokensPerProjectPerHour': { + 'consumed': 5, + # ~9% from original quota is left + 'remaining': 172 + } + } + }, + True, True, 1800, False, False, + ), + ( + { + 'propertyQuota': { + 'concurrentRequests': { + 'consumed': 9, + # 10% from original quota is left + 'remaining': 1 + }, + 'tokensPerProjectPerHour': { + 'consumed': 5, + 'remaining': 935 + } + } + }, + True, True, 30, False, False, + ) + ], + ids=[ + "Full", + "Partial", + "Running out tokensPerProjectPerHour", + "Running out concurrentRequests", + ] +) +def test_check_full_quota( + requests_mock, + expected_quota_list, + response_quota, + partial_quota, + should_retry_exp, + backoff_time_exp, + raise_on_http_errors_exp, + stop_iter_exp, +): + """ + Check the quota and prepare the initial values for subsequent comparison with subsequent response calls. + The default values for the scenario are expected when the quota is full. + """ + # Prepare instance + url = "https://analyticsdata.googleapis.com/v1beta/" + payload = response_quota + requests_mock.post(url, json=payload) + response = requests.post(url) + # process and prepare the scenario + TEST_QUOTA_INSTANCE._check_quota(response) + + # TEST BLOCK + + # Check the INITIAL QUOTA is saved properly + assert [quota in expected_quota_list for quota in TEST_QUOTA_INSTANCE.initial_quota.keys()] + + # Check the CURRENT QUOTA is different from Initial + if partial_quota: + current_quota = TEST_QUOTA_INSTANCE._get_known_quota_from_response(response.json().get('propertyQuota')) + assert not current_quota == TEST_QUOTA_INSTANCE.initial_quota + + # Check the scenario is applied based on Quota Values + # should_retry + assert TEST_QUOTA_INSTANCE.should_retry is should_retry_exp + # backoff_time + assert TEST_QUOTA_INSTANCE.backoff_time == backoff_time_exp + # raise_on_http_errors + assert TEST_QUOTA_INSTANCE.raise_on_http_errors is raise_on_http_errors_exp + # stop_iter + assert TEST_QUOTA_INSTANCE.stop_iter is stop_iter_exp diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py index 28f620ac62a61..90acba417f923 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py @@ -87,6 +87,7 @@ def test_request_body_json(patch_base_class): {"name": "browser"}, ], "dateRanges": [request_body_params["stream_slice"]], + "returnPropertyQuota": True, } request_body_json = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]).request_body_json(**request_body_params) @@ -258,7 +259,7 @@ def test_http_method(patch_base_class): [ (HTTPStatus.OK, False), (HTTPStatus.BAD_REQUEST, False), - (HTTPStatus.TOO_MANY_REQUESTS, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), (HTTPStatus.INTERNAL_SERVER_ERROR, True), ], ) diff --git a/connectors.md b/connectors.md index 36743ca5fc890..dc7101d3d81a7 100644 --- a/connectors.md +++ b/connectors.md @@ -88,7 +88,7 @@ | **Gong** | Gong icon | Source | airbyte/source-gong:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/gong) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-gong) | `32382e40-3b49-4b99-9c5c-4076501914e7` | | **Google Ads** | Google Ads icon | Source | airbyte/source-google-ads:0.2.10 | generally_available | [link](https://docs.airbyte.com/integrations/sources/google-ads) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-ads) | `253487c0-2246-43ba-a21f-5116b20a2c50` | | **Google Analytics (Universal Analytics)** | Google Analytics (Universal Analytics) icon | Source | airbyte/source-google-analytics-v4:0.1.34 | generally_available | [link](https://docs.airbyte.com/integrations/sources/google-analytics-universal-analytics) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-analytics-v4) | `eff3616a-f9c3-11eb-9a03-0242ac130003` | -| **Google Analytics 4 (GA4)** | Google Analytics 4 (GA4) icon | Source | airbyte/source-google-analytics-data-api:0.1.1 | beta | [link](https://docs.airbyte.com/integrations/sources/google-analytics-v4) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-analytics-data-api) | `3cc2eafd-84aa-4dca-93af-322d9dfeec1a` | +| **Google Analytics 4 (GA4)** | Google Analytics 4 (GA4) icon | Source | airbyte/source-google-analytics-data-api:0.1.2 | beta | [link](https://docs.airbyte.com/integrations/sources/google-analytics-v4) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-analytics-data-api) | `3cc2eafd-84aa-4dca-93af-322d9dfeec1a` | | **Google Directory** | Google Directory icon | Source | airbyte/source-google-directory:0.1.9 | alpha | [link](https://docs.airbyte.com/integrations/sources/google-directory) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-directory) | `d19ae824-e289-4b14-995a-0632eb46d246` | | **Google PageSpeed Insights** | Google PageSpeed Insights icon | Source | airbyte/source-google-pagespeed-insights:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/google-pagespeed-insights) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-pagespeed-insights) | `1e9086ab-ddac-4c1d-aafd-ba43ff575fe4` | | **Google Search Console** | Google Search Console icon | Source | airbyte/source-google-search-console:0.1.20 | generally_available | [link](https://docs.airbyte.com/integrations/sources/google-search-console) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-google-search-console) | `eb4c9e00-db83-4d63-a386-39cfa91012a8` | diff --git a/docs/integrations/sources/google-analytics-data-api.md b/docs/integrations/sources/google-analytics-data-api.md index 7d7ab05358a63..8ae1e91684f36 100644 --- a/docs/integrations/sources/google-analytics-data-api.md +++ b/docs/integrations/sources/google-analytics-data-api.md @@ -96,6 +96,7 @@ added by default to all reports. There are 8 default reports. To add more report | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------| +| 0.1.2 | 2023-03-07 | [23822](https://github.com/airbytehq/airbyte/pull/23822) | Improve `rate limits` customer faced error messages and retry logic for `429` | | 0.1.1 | 2023-01-10 | [21169](https://github.com/airbytehq/airbyte/pull/21169) | Slicer updated, unit tests added | | 0.1.0 | 2023-01-08 | [20889](https://github.com/airbytehq/airbyte/pull/20889) | Improved config validation, SAT | | 0.0.3 | 2022-08-15 | [15229](https://github.com/airbytehq/airbyte/pull/15229) | Source Google Analytics Data Api: code refactoring |