Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source LinkedIn Ads: move to next releaseStage #29045

Merged
merged 11 commits into from
Aug 7, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ COPY source_linkedin_ads ./source_linkedin_ads
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-linkedin-ads
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ acceptance_tests:
tests:
- config_path: "secrets/config_oauth.json"
backward_compatibility_tests_config:
disable_for_version: 0.1.16 # migration to May 2023 Api Version; schema changes; stream removed
disable_for_version: 0.2.1 # new fields added to schemas: `Campaigns`, 'Campaign_Groups', `Creatives`
timeout_seconds: 60
basic_read:
tests:
- config_path: "secrets/config_oauth.json"
expect_records:
path: "integration_tests/expected_records.jsonl"
fail_on_extra_columns: false
fail_on_extra_columns: true
incremental:
tests:
- config_path: "secrets/config_oauth.json"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorType: source
definitionId: 137ece28-5434-455c-8f34-69dc3782f451
maxSecondsBetweenMessages: 21600
dockerImageTag: 0.2.1
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-linkedin-ads
githubIssueLabel: source-linkedin-ads
icon: linkedin.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@
"test": {
"type": ["null", "boolean"]
},
"totalBudget": {
"type": ["null", "object"],
"properties": {
"amount": {
"type": ["null", "string"]
},
"currencyCode": {
"type": ["null", "string"]
}
}
},
"servingStatuses": {
"type": ["null", "array"],
"items": {
Expand All @@ -46,6 +57,12 @@
},
"status": {
"type": ["null", "string"]
},
"allowedCampaignTypes": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@
}
}
},
"totalBudget": {
"type": ["null", "object"],
"properties": {
"amount": {
"type": ["null", "string"]
},
"currencyCode": {
"type": ["null", "string"]
}
}
},
"unitCost": {
"type": ["null", "object"],
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,42 @@
"type": ["null", "string"]
},
"review": {
"type": ["null", "object"]
"type": ["null", "object"],
"properties" : {
"status": {
"type": ["null", "string"]
},
"rejectionReasons": {
"type": ["null", "array"]
}
}
},
"isServing": {
"type": ["null", "boolean"]
"isServing" : {
"type" : ["null", "boolean"]
},
"campaign": {
"type": ["null", "string"]
"campaign" : {
"type" : ["null", "string"]
},
"id": {
"type": ["null", "string"]
"id" : {
"type" : ["null", "string"]
},
"intendedStatus": {
"type": ["null", "string"]
"intendedStatus" : {
"type" : ["null", "string"]
},
"account": {
"type": ["null", "string"]
"account" : {
"type" : ["null", "string"]
},
"leadgenCallToAction" : {
"type" : ["null", "object"],
"properties" : {
"destination" : {
"type" : ["null", "string"]
},
"label" : {
"type" : ["null", "string"]
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
import logging
from typing import Any, List, Mapping, Optional, Tuple, Union

import backoff
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from source_linkedin_ads.streams import (
Accounts,
AccountUsers,
Expand All @@ -26,34 +23,6 @@
logger = logging.getLogger("airbyte")


class LinkedinAdsOAuth2Authenticator(Oauth2Authenticator):
@backoff.on_exception(
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
backoff.expo,
DefaultBackoffException,
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
max_time=300,
)
def refresh_access_token(self) -> Tuple[str, int]:
try:
response = requests.request(
method="POST",
url=self.token_refresh_endpoint,
data=self.get_refresh_request_body(),
headers=self.get_refresh_access_token_headers(),
)
response.raise_for_status()
response_json = response.json()
return response_json["access_token"], response_json["expires_in"]
except requests.exceptions.RequestException as e:
if e.response.status_code == 429 or e.response.status_code >= 500:
raise DefaultBackoffException(request=e.response.request, response=e.response)
raise
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e


class SourceLinkedinAds(AbstractSource):
"""
Abstract Source inheritance, provides:
Expand All @@ -62,7 +31,7 @@ class SourceLinkedinAds(AbstractSource):
"""

@classmethod
def get_authenticator(cls, config: Mapping[str, Any]) -> Union[TokenAuthenticator, LinkedinAdsOAuth2Authenticator]:
def get_authenticator(cls, config: Mapping[str, Any]) -> Union[TokenAuthenticator, Oauth2Authenticator]:
"""
Validate input parameters and generate a necessary Authentication object
This connectors support 2 auth methods:
Expand All @@ -76,7 +45,7 @@ def get_authenticator(cls, config: Mapping[str, Any]) -> Union[TokenAuthenticato
access_token = config["credentials"]["access_token"] if auth_method else config["access_token"]
return TokenAuthenticator(token=access_token)
elif auth_method == "oAuth2.0":
return LinkedinAdsOAuth2Authenticator(
return Oauth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id=config["credentials"]["client_id"],
client_secret=config["credentials"]["client_secret"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class LinkedinAdsStream(HttpStream, ABC):
def __init__(self, config: Dict):
super().__init__(authenticator=config.get("authenticator"))
self.config = config
self.date_time_fields = self._get_date_time_items_from_schema()

def _get_date_time_items_from_schema(self):
"""
Get all properties from schema with format: 'date-time'
"""
schema = self.get_json_schema()
return [k for k, v in schema["properties"].items() if v.get("format") == "date-time"]

@property
def accounts(self):
Expand Down Expand Up @@ -83,7 +91,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
"""
We need to get out the nested complex data structures for further normalisation, so the transform_data method is applied.
"""
yield from transform_data(response.json().get("elements"))
for record in transform_data(response.json().get("elements")):
yield self._date_time_to_rfc3339(record)

def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Transform 'date-time' items to RFC3339 format
"""
for item in record:
if item in self.date_time_fields and record[item]:
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
return record

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 429:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def get_parent_stream_values(record: Dict, key_value_map: Dict) -> Dict:
def transform_change_audit_stamps(
record: Dict, dict_key: str = "changeAuditStamps", props: List = ["created", "lastModified"], fields: List = ["time"]
) -> Mapping[str, Any]:

"""
:: EXAMPLE `changeAuditStamps` input structure:
{
Expand Down Expand Up @@ -96,7 +95,6 @@ def transform_date_range(
props: List = ["start", "end"],
fields: List = ["year", "month", "day"],
) -> Mapping[str, Any]:

"""
:: EXAMPLE `dateRange` input structure in Analytics streams:
{
Expand Down Expand Up @@ -320,7 +318,6 @@ def transform_data(records: List) -> Iterable[Mapping]:
to be properly normalised in the destination.
"""
for record in records:

if "changeAuditStamps" in record:
record = transform_change_audit_stamps(record)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from source_linkedin_ads.source import (
Accounts,
AccountUsers,
Expand All @@ -17,7 +17,6 @@
CampaignGroups,
Campaigns,
Creatives,
LinkedinAdsOAuth2Authenticator,
SourceLinkedinAds,
)
from source_linkedin_ads.streams import LINKEDIN_VERSION_API
Expand Down Expand Up @@ -302,7 +301,7 @@ def test_retry_get_access_token(requests_mock):
"https://www.linkedin.com/oauth/v2/accessToken",
[{"status_code": 429}, {"status_code": 429}, {"status_code": 200, "json": {"access_token": "token", "expires_in": 3600}}],
)
auth = LinkedinAdsOAuth2Authenticator(
auth = Oauth2Authenticator(
token_refresh_endpoint="https://www.linkedin.com/oauth/v2/accessToken",
client_id="client_id",
client_secret="client_secret",
Expand All @@ -311,3 +310,19 @@ def test_retry_get_access_token(requests_mock):
token = auth.get_access_token()
assert len(requests_mock.request_history) == 3
assert token == "token"


@pytest.mark.parametrize(
"record, expected",
[
({}, {}),
({"lastModified": "2021-05-27 11:59:53.710000"}, {"lastModified": "2021-05-27T11:59:53.710000+00:00"}),
({"lastModified": None}, {"lastModified": None}),
({"lastModified": ""}, {"lastModified": ""}),
],
ids=["empty_record", "transformed_record", "null_value", "empty_value"],
)
def test_date_time_to_rfc3339(record, expected):
stream = Accounts(TEST_CONFIG)
result = stream._date_time_to_rfc3339(record)
assert result == expected
1 change: 1 addition & 0 deletions docs/integrations/sources/linkedin-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ After 5 unsuccessful attempts - the connector will stop the sync operation. In s

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.3.0 | 2023-05-30 | [29045](https://github.com/airbytehq/airbyte/pull/29045) | Add new fields to schemas; convert datetime fields to `rfc3339` |
| 0.2.1 | 2023-05-30 | [26780](https://github.com/airbytehq/airbyte/pull/26780) | Reduce records limit for Creatives Stream |
| 0.2.0 | 2023-05-23 | [26372](https://github.com/airbytehq/airbyte/pull/26372) | Migrate to LinkedIn API version: May 2023 |
| 0.1.16 | 2023-05-24 | [26512](https://github.com/airbytehq/airbyte/pull/26512) | Removed authSpecification from spec.json in favour of advancedAuth |
Expand Down
Loading