Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Pinterest: returns config_error after low-code migration without reset #39559

Merged
merged 29 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
97eebbc
Ensure Pinterest returns config_error after low-code migration withou…
maxi297 Jun 18, 2024
5f942d1
Update release information
maxi297 Jun 18, 2024
01c857e
Move from auth to requests_native_auth package for authentication
maxi297 Jun 18, 2024
d0a4076
Merge branch 'issue-8257/update-pinterest' of https://github.com/airb…
ChristoGrab Jun 18, 2024
2bf5679
update authenticator package
ChristoGrab Jun 18, 2024
25f414d
use requests library for fetching report data
ChristoGrab Jun 19, 2024
816ee0b
chore: format
ChristoGrab Jun 19, 2024
1c3e150
chore: fix unit test
ChristoGrab Jun 19, 2024
4ab14cb
Merge branch 'master' into issue-8257/update-pinterest
ChristoGrab Jun 19, 2024
dd9aaae
Merge branch 'issue-8257/update-pinterest' of https://github.com/airb…
ChristoGrab Jun 19, 2024
7602d54
chore: fix unit tests
ChristoGrab Jun 19, 2024
ff8ffb0
chore: format
ChristoGrab Jun 19, 2024
1eb535a
chore: fix last unit tests
ChristoGrab Jun 19, 2024
9b22fa8
add authenticator unit tests
ChristoGrab Jun 20, 2024
9e16001
Merge branch 'master' into issue-8257/update-pinterest
ChristoGrab Jun 21, 2024
ec76eac
update configured_catalog for CAT tests
ChristoGrab Jun 21, 2024
74adf82
Merge branch 'issue-8257/update-pinterest' of https://github.com/airb…
ChristoGrab Jun 21, 2024
4178d74
Fix parent stream state issue
maxi297 Jun 28, 2024
23df232
Merge branch 'master' into issue-8257/update-pinterest
maxi297 Jun 28, 2024
5dade79
Merge branch 'master' into issue-8257/update-pinterest
maxi297 Jun 29, 2024
5ae4ad7
Merge branch 'master' into issue-8257/update-pinterest
bazarnov Jul 1, 2024
a68b373
Merge branch 'master' into issue-8257/update-pinterest
maxi297 Jul 2, 2024
f43b63c
Removing some incremental streams
maxi297 Jul 2, 2024
22494b4
Merge branch 'master' into issue-8257/update-pinterest
maxi297 Jul 29, 2024
f36ffd2
Migration for airbyte_cdk version 3
maxi297 Jul 29, 2024
5fb4f3b
Remove custom BackoffStrategy
maxi297 Jul 30, 2024
78fbd03
format
maxi297 Jul 30, 2024
7a2e26b
add missing file
maxi297 Jul 30, 2024
2dc9bff
Update CDK version to have newest backoffstrategy interface
maxi297 Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 5cb7e5fe-38c2-11ec-8d3d-0242ac130003
dockerImageTag: 2.0.7
dockerImageTag: 2.0.8
dockerRepository: airbyte/source-pinterest
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916
Expand Down
228 changes: 156 additions & 72 deletions airbyte-integrations/connectors/source-pinterest/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.7"
version = "2.0.8"
name = "source-pinterest"
description = "Source implementation for Pinterest."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -16,9 +16,9 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_pinterest"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
python = "^3.10,<3.12"
pendulum = "==2.1.2"
airbyte-cdk = "0.86.3"
airbyte-cdk = "^4"

[tool.poetry.scripts]
source-pinterest = "source_pinterest.run:run"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping, Optional, Tuple

import pendulum
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.airbyte_secrets_utils import add_to_secrets


class PinterestOauthAuthenticator(Oauth2Authenticator):
"""
Custom implementation of Oauth2Authenticator that allows injection of auth headers in call to refresh access token
"""

def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
refresh_token: str,
scopes: List[str] = None,
token_expiry_date: pendulum.DateTime = None,
token_expiry_date_format: str = None,
access_token_name: str = "access_token",
expires_in_name: str = "expires_in",
refresh_request_body: Mapping[str, Any] = None,
grant_type: str = "refresh_token",
token_expiry_is_time_of_expiration: bool = False,
refresh_token_error_status_codes: Tuple[int, ...] = (),
refresh_token_error_key: str = "",
refresh_token_error_values: Tuple[str, ...] = (),
refresh_access_token_headers: Optional[Mapping[str, Any]] = None,
):
super().__init__(
token_refresh_endpoint,
client_id,
client_secret,
refresh_token,
scopes,
token_expiry_date,
token_expiry_date_format,
access_token_name,
expires_in_name,
refresh_request_body,
grant_type,
token_expiry_is_time_of_expiration,
refresh_token_error_status_codes,
refresh_token_error_key,
refresh_token_error_values,
)
self._refresh_access_token_headers = refresh_access_token_headers

def _get_refresh_access_token_response(self) -> Any:
try:
headers = self._refresh_access_token_headers or {}
response = requests.request(
method="POST",
url=self.get_token_refresh_endpoint(),
data=self.build_refresh_request_body(),
headers=headers,
)
if response.ok:
response_json = response.json()
access_key = response_json.get(self.get_access_token_name())
if not access_key:
raise Exception(f"Token refresh API response was missing access token {self.get_access_token_name()}")
add_to_secrets(access_key)
self._log_response(response)
return response_json
else:
self._log_response(response)
response.raise_for_status()
except requests.exceptions.RequestException as e:
if e.response is not None:
if e.response.status_code == 429 or e.response.status_code >= 500:
raise DefaultBackoffException(request=e.response.request, response=e.response)
if self._wrap_refresh_token_exception(e):
message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings."
raise AirbyteTracedException(internal_message=message, message=message, failure_type=FailureType.config_error)
raise
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import Enum
from typing import List, Optional

from pydantic import BaseModel
from pydantic.v1 import BaseModel


class ReportStatus(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,14 @@ def _init_reports(self, init_reports) -> List[ReportInfo]:

def _http_get(self, url, params=None, headers=None):
"""Make a GET request to the given URL and return the response as a JSON."""
response = self._session.get(url, params=params, headers=headers)
response = self._http_client._session.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()

def _verify_report_status(self, report: dict, stream_slice: Mapping[str, Any]) -> tuple:
"""Verify the report status and return it along with the report URL."""
api_path = self._build_api_path(stream_slice["parent"]["id"])
response_data = self._http_get(
urljoin(self.url_base, api_path), params={"token": report.token}, headers=self.authenticator.get_auth_header()
)
response_data = self._http_get(urljoin(self.url_base, api_path), params={"token": report.token})
try:
report_status = ReportStatusDetails.parse_raw(json.dumps(response_data))
except ValueError as error:
Expand All @@ -166,7 +164,9 @@ def _verify_report_status(self, report: dict, stream_slice: Mapping[str, Any]) -

def _fetch_report_data(self, url: str) -> dict:
"""Fetch the report data from the given URL."""
return self._http_get(url)
response = requests.get(url)
response.raise_for_status()
return response.json()

@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
Expand Down Expand Up @@ -265,9 +265,10 @@ def level(self):

class CustomReport(PinterestAnalyticsTargetingReportStream):
def __init__(self, **kwargs):
# as HttpStream.__init__ requires the name of the stream, we need to assign `self._custom_class_name` before calling the parent
self._custom_class_name = f"Custom_{kwargs['config']['name']}"
super().__init__(**kwargs)

self._custom_class_name = f"Custom_{self.config['name']}"
self._level = self.config["level"]
self.granularity = self.config["granularity"]
self.click_window_days = self.config["click_window_days"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from airbyte_cdk.utils import AirbyteTracedException
from source_pinterest.reports import CampaignAnalyticsReport

from .python_stream_auth import PinterestOauthAuthenticator
from .reports.reports import (
AdGroupReport,
AdGroupTargetingReport,
Expand Down Expand Up @@ -75,7 +76,7 @@ def get_authenticator(config) -> Oauth2Authenticator:
).decode("ascii")
auth = f"Basic {credentials_base64_encoded}"

return Oauth2Authenticator(
return PinterestOauthAuthenticator(
token_refresh_endpoint=f"{PinterestStream.url_base}oauth/token",
client_secret=config.get("client_secret"),
client_id=config.get("client_id"),
Expand All @@ -88,27 +89,38 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
report_config = self._validate_and_transform(config, amount_of_days_allowed_for_lookup=913)

declarative_streams = super().streams(config)
ad_accounts = [stream for stream in declarative_streams if stream.name == "ad_accounts"][0]

# Report streams involve async data fetch, which is currently not supported in low-code
report_streams = [
CampaignAnalyticsReport(ad_accounts, config=report_config),
CampaignTargetingReport(ad_accounts, config=report_config),
AdvertiserReport(ad_accounts, config=report_config),
AdvertiserTargetingReport(ad_accounts, config=report_config),
AdGroupReport(ad_accounts, config=report_config),
AdGroupTargetingReport(ad_accounts, config=report_config),
PinPromotionReport(ad_accounts, config=report_config),
PinPromotionTargetingReport(ad_accounts, config=report_config),
ProductGroupReport(ad_accounts, config=report_config),
ProductGroupTargetingReport(ad_accounts, config=report_config),
KeywordReport(ad_accounts, config=report_config),
ProductItemReport(ad_accounts, config=report_config),
] + self.get_custom_report_streams(ad_accounts, config=report_config)
CampaignAnalyticsReport(self._create_ad_accounts_stream(config), config=report_config),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 and I discussed this aspect a little bit. Although its not great to have to repeat this action so many times to generate independent streams, it doesn't have an I/O on the network and we only do this at the beginning so the performance impact is minimal.

We did play around with deep copy which didn't quite work on the nested objects among other reasons.

We floated a few ideas on how we might address this for other sources that do have dependencies on the API

  • a light copy of a stream that doesn't share state
  • a full custom copy of a stream
  • a way to extract only a single stream from the manifest or something

either way, this is not worth blocking on, but something we'll need to at some point solve for

CampaignTargetingReport(self._create_ad_accounts_stream(config), config=report_config),
AdvertiserReport(self._create_ad_accounts_stream(config), config=report_config),
AdvertiserTargetingReport(self._create_ad_accounts_stream(config), config=report_config),
AdGroupReport(self._create_ad_accounts_stream(config), config=report_config),
AdGroupTargetingReport(self._create_ad_accounts_stream(config), config=report_config),
PinPromotionReport(self._create_ad_accounts_stream(config), config=report_config),
PinPromotionTargetingReport(self._create_ad_accounts_stream(config), config=report_config),
ProductGroupReport(self._create_ad_accounts_stream(config), config=report_config),
ProductGroupTargetingReport(self._create_ad_accounts_stream(config), config=report_config),
KeywordReport(self._create_ad_accounts_stream(config), config=report_config),
ProductItemReport(self._create_ad_accounts_stream(config), config=report_config),
] + self.get_custom_report_streams(config=report_config)

return declarative_streams + report_streams

def get_custom_report_streams(self, parent, config: Mapping[str, Any]) -> List[Stream]:
def _create_ad_accounts_stream(self, config):
"""
Sync the recent changes in RFR, it is not possible to re-use the same stream as a parent stream as after a full refresh of the
parent stream, the internal state of the parent stream will switch to FULL_REFRESH_SYNC_COMPLETE_KEY and will prevent syncing any
more records. Hence, each parents need to have their own parent stream.

In terms of caching, this parent stream wasn't cache on version 2.0.3 so we would do two calls to
"https://api.pinterest.com/v5/ad_accounts" for each stream: one during the availability strategy and one during the read. This means
that this change should not impact performance.
"""
return [stream for stream in super().streams(config) if stream.name == "ad_accounts"][0]

def get_custom_report_streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""return custom report streams"""
custom_streams = []
for report_config in config.get("custom_reports", []):
Expand All @@ -129,6 +141,6 @@ def get_custom_report_streams(self, parent, config: Mapping[str, Any]) -> List[S

report_config = self._validate_and_transform(report_config, amount_of_days_allowed_for_lookup)

stream = CustomReport(parent=parent, config=report_config)
stream = CustomReport(parent=self._create_ad_accounts_stream(config), config=report_config)
custom_streams.append(stream)
return custom_streams
Loading
Loading