Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source linkedin-ads: retry 429/5xx when refreshing access token #17724

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ COPY source_linkedin_ads ./source_linkedin_ads
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.name=airbyte/source-linkedin-ads
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ tests:
connection:
- config_path: "secrets/config_oauth.json"
status: "succeed"
timeout_seconds: 60
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config_oauth.json"
timeout_seconds: 60
basic_read:
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import urlencode

import backoff
import logging
import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException

from .analytics import make_analytics_slices, merge_chunks, update_analytics_params
from .utils import get_parent_stream_values, transform_data


logger = logging.getLogger("airbyte")


class LinkedinAdsStream(HttpStream, ABC):
"""
Basic class provides base functionality for all streams.
Expand Down Expand Up @@ -279,7 +285,9 @@ def read_records(
parent_stream = self.parent_stream(config=self.config)
for record in parent_stream.read_records(**kwargs):
result_chunks = []
for analytics_slice in make_analytics_slices(record, self.parent_values_map, stream_state.get(self.cursor_field)):
for analytics_slice in make_analytics_slices(
record, self.parent_values_map, stream_state.get(self.cursor_field), self.config.get("end_date")
):
child_stream_slice = super().read_records(stream_slice=analytics_slice, **kwargs)
result_chunks.append(child_stream_slice)
yield from merge_chunks(result_chunks, self.cursor_field)
Expand Down Expand Up @@ -311,6 +319,34 @@ class AdCreativeAnalytics(LinkedInAdsAnalyticsStream):
pivot_by = "CREATIVE"


class LinkedinAdsOAuth2Authenticator(Oauth2Authenticator):
@backoff.on_exception(
backoff.expo,
DefaultBackoffException,
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
max_time=300,
)
def refresh_access_token(self) -> Tuple[str, int]:
try:
response = requests.request(
method="POST",
url=self.token_refresh_endpoint,
data=self.get_refresh_request_body(),
headers=self.get_refresh_access_token_headers(),
)
response.raise_for_status()
response_json = response.json()
return response_json["access_token"], response_json["expires_in"]
except requests.exceptions.RequestException as e:
if e.response.status_code == 429 or e.response.status_code >= 500:
raise DefaultBackoffException(request=e.response.request, response=e.response)
raise
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e


class SourceLinkedinAds(AbstractSource):
"""
Abstract Source inheritance, provides:
Expand All @@ -333,7 +369,7 @@ def get_authenticator(cls, config: Mapping[str, Any]) -> TokenAuthenticator:
access_token = config["credentials"]["access_token"] if auth_method else config["access_token"]
return TokenAuthenticator(token=access_token)
elif auth_method == "oAuth2.0":
return Oauth2Authenticator(
return LinkedinAdsOAuth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id=config["credentials"]["client_id"],
client_secret=config["credentials"]["client_secret"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Campaigns,
Creatives,
SourceLinkedinAds,
LinkedinAdsOAuth2Authenticator,
)

TEST_OAUTH_CONFIG: dict = {
Expand Down Expand Up @@ -320,3 +321,20 @@ def test_request_params(self, stream_cls, slice, expected):
stream = stream_cls(TEST_CONFIG)
result = stream.request_params(stream_state={}, stream_slice=slice)
assert expected == result


def test_retry_get_access_token(requests_mock):
requests_mock.register_uri(
"POST",
"https://www.linkedin.com/oauth/v2/accessToken",
[{"status_code": 429}, {"status_code": 429}, {"status_code": 200, "json": {"access_token": "token", "expires_in": 3600}}],
)
auth = LinkedinAdsOAuth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id="client_id",
client_secret="client_secret",
refresh_token="refresh_token",
)
token = auth.get_access_token()
assert len(requests_mock.request_history) == 3
assert token == "token"
3 changes: 2 additions & 1 deletion docs/integrations/sources/linkedin-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ After 5 unsuccessful attempts - the connector will stop the sync operation. In s
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------|
| 0.1.11 | 2022-10-07 | [17724](https://github.com/airbytehq/airbyte/pull/17724) | Retry 429/5xx errors when refreshing access token |
| 0.1.10 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.9 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas |
| 0.1.8 | 2022-06-07 | [13495](https://github.com/airbytehq/airbyte/pull/13495) | Fixed `base-normalization` issue on `Destination Redshift` caused by wrong casting of `pivot` column |
Expand Down