From a1ca0a928011a616579a60eefa1c5198b587ed83 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 7 Oct 2022 15:15:08 +0300 Subject: [PATCH 1/3] #17506 source linkedin-ads: retry 429/5xx when refreshing access token --- .../connectors/source-linkedin-ads/Dockerfile | 2 +- .../acceptance-test-config.yml | 2 + .../source_linkedin_ads/source.py | 40 ++++++++++++++++++- .../unit_tests/source_tests/test_source.py | 18 +++++++++ docs/integrations/sources/linkedin-ads.md | 3 +- 5 files changed, 61 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile b/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile index 66e0d7aca734d2..068b60e44cce20 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile +++ b/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile @@ -33,5 +33,5 @@ COPY source_linkedin_ads ./source_linkedin_ads ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.10 +LABEL io.airbyte.version=0.1.11 LABEL io.airbyte.name=airbyte/source-linkedin-ads diff --git a/airbyte-integrations/connectors/source-linkedin-ads/acceptance-test-config.yml b/airbyte-integrations/connectors/source-linkedin-ads/acceptance-test-config.yml index 3ac55986fb544e..7a9b8bf2061811 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-linkedin-ads/acceptance-test-config.yml @@ -7,10 +7,12 @@ tests: connection: - config_path: "secrets/config_oauth.json" status: "succeed" + timeout_seconds: 60 - config_path: "integration_tests/invalid_config.json" status: "failed" discovery: - config_path: "secrets/config_oauth.json" + timeout_seconds: 60 basic_read: - config_path: "secrets/config_oauth.json" configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py index 5fbfdbe55c1df5..0e0ab25ccb1fdc 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py @@ -7,6 +7,8 @@ from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple from urllib.parse import urlencode +import backoff +import logging import requests from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import SyncMode @@ -14,11 +16,15 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator +from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException from .analytics import make_analytics_slices, merge_chunks, update_analytics_params from .utils import get_parent_stream_values, transform_data +logger = logging.getLogger("airbyte") + + class LinkedinAdsStream(HttpStream, ABC): """ Basic class provides base functionality for all streams. @@ -279,7 +285,9 @@ def read_records( parent_stream = self.parent_stream(config=self.config) for record in parent_stream.read_records(**kwargs): result_chunks = [] - for analytics_slice in make_analytics_slices(record, self.parent_values_map, stream_state.get(self.cursor_field)): + for analytics_slice in make_analytics_slices( + record, self.parent_values_map, stream_state.get(self.cursor_field), self.config.get("end_date") + ): child_stream_slice = super().read_records(stream_slice=analytics_slice, **kwargs) result_chunks.append(child_stream_slice) yield from merge_chunks(result_chunks, self.cursor_field) @@ -311,6 +319,34 @@ class AdCreativeAnalytics(LinkedInAdsAnalyticsStream): pivot_by = "CREATIVE" +class LinkedinAdsOAuth2Authenticator(Oauth2Authenticator): + @backoff.on_exception( + backoff.expo, + DefaultBackoffException, + on_backoff=lambda details: logger.info( + f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." + ), + max_time=300, + ) + def refresh_access_token(self) -> Tuple[str, int]: + try: + response = requests.request( + method="POST", + url=self.token_refresh_endpoint, + data=self.get_refresh_request_body(), + headers=self.get_refresh_access_token_headers(), + ) + response.raise_for_status() + response_json = response.json() + return response_json["access_token"], response_json["expires_in"] + except requests.exceptions.RequestException as e: + if e.response.status_code == 429 or e.response.status_code >= 500: + raise DefaultBackoffException(request=e.response.request, response=e.response) + raise + except Exception as e: + raise Exception(f"Error while refreshing access token: {e}") from e + + class SourceLinkedinAds(AbstractSource): """ Abstract Source inheritance, provides: @@ -333,7 +369,7 @@ def get_authenticator(cls, config: Mapping[str, Any]) -> TokenAuthenticator: access_token = config["credentials"]["access_token"] if auth_method else config["access_token"] return TokenAuthenticator(token=access_token) elif auth_method == "oAuth2.0": - return Oauth2Authenticator( + return LinkedinAdsOAuth2Authenticator( token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken", client_id=config["credentials"]["client_id"], client_secret=config["credentials"]["client_secret"], diff --git a/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py b/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py index 3aeaff1f6ffa43..eeab0264cca563 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py @@ -19,6 +19,7 @@ Campaigns, Creatives, SourceLinkedinAds, + LinkedinAdsOAuth2Authenticator, ) TEST_OAUTH_CONFIG: dict = { @@ -320,3 +321,20 @@ def test_request_params(self, stream_cls, slice, expected): stream = stream_cls(TEST_CONFIG) result = stream.request_params(stream_state={}, stream_slice=slice) assert expected == result + + +def test_retry_get_access_token(requests_mock): + requests_mock.register_uri( + "POST", + "https://www.linkedin.com/oauth/v2/accessToken", + [{"status_code": 429}, {"status_code": 429}, {"status_code": 200, "json": {"access_token": "token", "expires_in": 3600}}], + ) + auth = LinkedinAdsOAuth2Authenticator( + token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken", + client_id="client_id", + client_secret="client_secret", + refresh_token="refresh_token", + ) + token = auth.get_access_token() + assert len(requests_mock.request_history) == 3 + assert token == "token" diff --git a/docs/integrations/sources/linkedin-ads.md b/docs/integrations/sources/linkedin-ads.md index cf4048c25e04a6..8e4882893bad6c 100644 --- a/docs/integrations/sources/linkedin-ads.md +++ b/docs/integrations/sources/linkedin-ads.md @@ -181,7 +181,8 @@ After 5 unsuccessful attempts - the connector will stop the sync operation. In s ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------| +| 0.1.11 | 2022-10-07 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Retry 429/5xx errors when refreshing access token | | 0.1.10 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. | | 0.1.9 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas | | 0.1.8 | 2022-06-07 | [13495](https://github.com/airbytehq/airbyte/pull/13495) | Fixed `base-normalization` issue on `Destination Redshift` caused by wrong casting of `pivot` column | From e35adb9010051d61cbec5125345e31872815ce2f Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 7 Oct 2022 15:17:37 +0300 Subject: [PATCH 2/3] source linkedin-ads: upd changelog --- docs/integrations/sources/linkedin-ads.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/linkedin-ads.md b/docs/integrations/sources/linkedin-ads.md index 8e4882893bad6c..440e04fdafb642 100644 --- a/docs/integrations/sources/linkedin-ads.md +++ b/docs/integrations/sources/linkedin-ads.md @@ -182,7 +182,7 @@ After 5 unsuccessful attempts - the connector will stop the sync operation. In s | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------| -| 0.1.11 | 2022-10-07 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Retry 429/5xx errors when refreshing access token | +| 0.1.11 | 2022-10-07 | [17724](https://github.com/airbytehq/airbyte/pull/17724) | Retry 429/5xx errors when refreshing access token | | 0.1.10 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. | | 0.1.9 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas | | 0.1.8 | 2022-06-07 | [13495](https://github.com/airbytehq/airbyte/pull/13495) | Fixed `base-normalization` issue on `Destination Redshift` caused by wrong casting of `pivot` column | From 510eae0f06591eab3831f32338e958947d6afc26 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Fri, 7 Oct 2022 14:12:48 +0000 Subject: [PATCH 3/3] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../source-linkedin-ads/source_linkedin_ads/source.py | 3 +-- .../source-linkedin-ads/unit_tests/source_tests/test_source.py | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 610338dcab5300..e1804948b6cd4c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -566,7 +566,7 @@ - name: LinkedIn Ads sourceDefinitionId: 137ece28-5434-455c-8f34-69dc3782f451 dockerRepository: airbyte/source-linkedin-ads - dockerImageTag: 0.1.10 + dockerImageTag: 0.1.11 documentationUrl: https://docs.airbyte.io/integrations/sources/linkedin-ads icon: linkedin.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 79b9016c153b47..e121e62507945e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5674,7 +5674,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-linkedin-ads:0.1.10" +- dockerImage: "airbyte/source-linkedin-ads:0.1.11" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/linkedin-ads" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py index 0e0ab25ccb1fdc..666a3fb53e53e3 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/source.py @@ -3,12 +3,12 @@ # +import logging from abc import ABC, abstractproperty from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple from urllib.parse import urlencode import backoff -import logging import requests from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import SyncMode @@ -21,7 +21,6 @@ from .analytics import make_analytics_slices, merge_chunks, update_analytics_params from .utils import get_parent_stream_values, transform_data - logger = logging.getLogger("airbyte") diff --git a/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py b/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py index eeab0264cca563..2f666c1605c398 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/source_tests/test_source.py @@ -18,8 +18,8 @@ CampaignGroups, Campaigns, Creatives, - SourceLinkedinAds, LinkedinAdsOAuth2Authenticator, + SourceLinkedinAds, ) TEST_OAUTH_CONFIG: dict = {