Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Source Amazon Ads: migrate source to YamlDeclarativeSource with custom check_connection #35481

Merged
merged 9 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerImageTag: 4.0.3
dockerImageTag: 4.0.4
dockerRepository: airbyte/source-amazon-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-ads
githubIssueLabel: source-amazon-ads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.0.3"
version = "4.0.4"
name = "source-amazon-ads"
description = "Source implementation for Amazon Ads."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from logging import Logger
from typing import Any, List, Mapping

from airbyte_cdk.models import AirbyteConnectionStatus
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_protocol.models import ConnectorSpecification


class DeclarativeSourceAdapter(YamlDeclarativeSource):
def __init__(self, source: AbstractSource) -> None:
self._source = source
super().__init__(path_to_yaml="manifest.yaml")
self._set_adapted_methods()

@property
def name(self) -> str:
return self._source.name

def spec(self, logger: Logger) -> ConnectorSpecification:
return self._source.spec(logger)

def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
return self._source.check(logger, config)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return self._source.streams(config)

def _validate_source(self) -> None:
"""Skipping manifest validation as it can be incomplete when use adapter"""
return

def _set_adapted_methods(self) -> None:
girarda marked this conversation as resolved.
Show resolved Hide resolved
"""
Since the adapter is intended to smoothly migrate the connector,
this method determines whether each of methods `spec`, `check`, and `streams` was declared in the manifest file
and if yes, makes the source use it, otherwise the method defined in the source will be used
"""
adapted_methods = ("spec", "check", "streams")
for method in adapted_methods:
if method in self.resolved_manifest:
self._source.__setattr__(method, getattr(super(), method))
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
version: 0.60.1
type: source_amazon_ads.SourceAmazonAds
spec:
type: Spec
documentation_url: https://docs.airbyte.com/integrations/sources/amazon-ads
connection_specification:
title: Amazon Ads Spec
type: object
properties:
auth_type:
title: Auth Type
const: oauth2.0
order: 0
type: string
client_id:
title: Client ID
description:
The client ID of your Amazon Ads developer application. See the
<a href="https://advertising.amazon.com/API/docs/en-us/get-started/generate-api-tokens#retrieve-your-client-id-and-client-secret">docs</a>
for more information.
order: 1
type: string
airbyte_secret: true
client_secret:
title: Client Secret
description:
The client secret of your Amazon Ads developer application. See
the <a href="https://advertising.amazon.com/API/docs/en-us/get-started/generate-api-tokens#retrieve-your-client-id-and-client-secret">docs</a>
for more information.
airbyte_secret: true
order: 2
type: string
refresh_token:
title: Refresh Token
description:
Amazon Ads refresh token. See the <a href="https://advertising.amazon.com/API/docs/en-us/get-started/generate-api-tokens">docs</a>
for more information on how to obtain this token.
airbyte_secret: true
order: 3
type: string
region:
title: Region
description:
Region to pull data from (EU/NA/FE). See <a href="https://advertising.amazon.com/API/docs/en-us/info/api-overview#api-endpoints">docs</a>
for more details.
enum:
- NA
- EU
- FE
type: string
default: NA
order: 4
start_date:
title: Start Date
description:
The Start date for collecting reports, should not be more than
60 days in the past. In YYYY-MM-DD format
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
format: date
examples:
- "2022-10-10"
- "2022-10-22"
order: 5
type: string
profiles:
title: Profile IDs
description: 'Profile IDs you want to fetch data for. See <a href="https://advertising.amazon.com/API/docs/en-us/concepts/authorization/profiles">docs</a> for more details. Note: If Marketplace IDs are also selected, profiles will be selected if they match the Profile ID OR the Marketplace ID.'
order: 6
type: array
items:
type: integer
marketplace_ids:
title: Marketplace IDs
description: "Marketplace IDs you want to fetch data for. Note: If Profile IDs are also selected, profiles will be selected if they match the Profile ID OR the Marketplace ID."
order: 7
type: array
items:
type: string
state_filter:
title: State Filter
description: Reflects the state of the Display, Product, and Brand Campaign streams as enabled, paused, or archived. If you do not populate this field, it will be ignored completely.
items:
type: string
enum:
- enabled
- paused
- archived
type: array
uniqueItems: true
order: 8
look_back_window:
title: "Look Back Window"
description: "The amount of days to go back in time to get the updated data from Amazon Ads"
examples:
- 3
- 10
type: "integer"
default: 3
order: 9
report_record_types:
title: Report Record Types
description:
Optional configuration which accepts an array of string of record types.
Leave blank for default behaviour to pull all report types.
Use this config option only if you want to pull specific report type(s).
See <a href="https://advertising.amazon.com/API/docs/en-us/reporting/v2/report-types">docs</a>
for more details
items:
type: string
enum:
- adGroups
- asins
- asins_keywords
- asins_targets
- campaigns
- keywords
- productAds
- targets
type: array
uniqueItems: true
order: 10
required:
- client_id
- client_secret
- refresh_token
additionalProperties: true
advanced_auth:
auth_flow_type: oauth2.0
predicate_key:
- auth_type
predicate_value: oauth2.0
oauth_config_specification:
oauth_user_input_from_connector_config_specification:
type: object
additionalProperties: false
properties:
region:
type: string
path_in_connector_config:
- region
complete_oauth_output_specification:
type: object
additionalProperties: true
properties:
refresh_token:
type: string
path_in_connector_config:
- refresh_token
complete_oauth_server_input_specification:
type: object
additionalProperties: true
properties:
client_id:
type: string
client_secret:
type: string
complete_oauth_server_output_specification:
type: object
additionalProperties: true
properties:
client_id:
type: string
path_in_connector_config:
- client_id
client_secret:
type: string
path_in_connector_config:
- client_secret
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from airbyte_cdk.entrypoint import launch
from source_amazon_ads import SourceAmazonAds
from source_amazon_ads.config_migrations import MigrateStartDate
from source_amazon_ads.declarative_source_adapter import DeclarativeSourceAdapter


def run():
source = SourceAmazonAds()
source = DeclarativeSourceAdapter(source=SourceAmazonAds())
MigrateStartDate.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@


class SourceAmazonAds(AbstractSource):
def _validate_and_transform(self, config: Mapping[str, Any]):
def _validate_and_transform(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
start_date = config.get("start_date")
if start_date:
config["start_date"] = pendulum.from_format(start_date, CONFIG_DATE_FORMAT).date()
Expand All @@ -69,7 +69,8 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
"""
:param config: the user-input config object conforming to the connector's spec.json
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully,
(False, error) otherwise.
"""
try:
config = self._validate_and_transform(config)
Expand All @@ -78,7 +79,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
# Check connection by sending list of profiles request. Its most simple
# request, not require additional parameters and usually has few data
# in response body.
# It doesnt support pagination so there is no sense of reading single
# It doesn't support pagination so there is no sense of reading single
# record, it would fetch all the data anyway.
profiles_list = Profiles(config, authenticator=self._make_authenticator(config)).get_all_profiles()
filtered_profiles = self._choose_profiles(config, profiles_list)
Expand All @@ -89,15 +90,15 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
:return list of streams for current source
:return: list of streams for current source
"""
config = self._validate_and_transform(config)
auth = self._make_authenticator(config)
stream_args = {"config": config, "authenticator": auth}
# All data for individual Amazon Ads stream divided into sets of data for
# each profile. Every API request except profiles has required
# paramater passed over "Amazon-Advertising-API-Scope" http header and
# should contain profile id. So every stream is dependant on Profiles
# parameter passed over "Amazon-Advertising-API-Scope" http header and
# should contain profile id. So every stream is dependent on Profiles
# stream and should have information about all profiles.
profiles_stream = Profiles(**stream_args)
profiles_list = profiles_stream.get_all_profiles()
Expand Down
Loading
Loading