Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Amazon Seller Partner: improve UX #32505

Merged
merged 54 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0905a2a
Source Bing Ads: improve UX
artem1205 Nov 14, 2023
1fa68c6
Source Amazon Seller Partner: remove advanced_stream_options
artem1205 Nov 15, 2023
fabb498
Source Amazon Seller Partner: remove advanced_stream_options
artem1205 Nov 15, 2023
0c351fd
Merge remote-tracking branch 'origin/artem1205/source-amazon-seller-p…
artem1205 Nov 15, 2023
e58f4c5
Merge remote-tracking branch 'origin/master' into artem1205/source-am…
artem1205 Nov 15, 2023
8a24e91
Automated Commit - Formatting Changes
artem1205 Nov 15, 2023
25090e7
Source Amazon Seller Partner: fix merge conflict
artem1205 Nov 15, 2023
513e2bd
Source Amazon Seller Partner: valdiate report
artem1205 Nov 15, 2023
15b34b8
Merge remote-tracking branch 'origin/master' into artem1205/source-am…
artem1205 Nov 17, 2023
9441b07
Source Bing Ads: refactor
artem1205 Nov 17, 2023
55df2da
Source Amazon Seller Partner: fix stream report options
artem1205 Nov 17, 2023
034efe7
Source Amazon Seller Partner: add test for report options
artem1205 Nov 17, 2023
da806d7
Merge remote-tracking branch 'origin/master' into artem1205/source-am…
artem1205 Nov 17, 2023
d2a5632
Source Amazon Seller Partner: add test for report options
artem1205 Nov 17, 2023
b611729
Source Hubspot: fix expected records (#32645)
roman-yermilov-gl Nov 20, 2023
a9cc2b9
Source Github: fix expected records (#32644)
roman-yermilov-gl Nov 20, 2023
e74bbde
✨ Source Mailchimp: Add Interests, InterestCategories, Tags streams (…
ChristoGrab Nov 20, 2023
20f312c
:sparkles: Source Amazon Seller Partner: added unexpected fields (#32…
darynaishchenko Nov 21, 2023
918e64c
🐛 Source Pinterest: Fix backoff waiting time (#32672)
tolik0 Nov 21, 2023
28eab6b
:bug: Source Amazon Seller Partner: Silently exit sync if the retry a…
darynaishchenko Nov 21, 2023
0f28144
🚨🚨🐛 Source Pinterest: Update date-time fields with airbyte_type: time…
tolik0 Nov 21, 2023
dfe9007
:sparkles: Source Amazon Seller Partner: remove required replication …
darynaishchenko Nov 21, 2023
39ac79c
remove bq+snowflake from legacy normalization docs (#32708)
edgao Nov 21, 2023
d90223c
🐛Source Amazon Seller Partner: make AFN inventory streams incremental…
Nov 22, 2023
4774ca8
Merge branch 'source-amazon-seller-partner' into artem1205/source-ama…
Nov 23, 2023
667be92
🐛 Source Pinterest: Fix Advertiser stream names (#32734)
tolik0 Nov 23, 2023
059ca2f
✨ Source Pinterest: Update docs and spec; add missing `placement_traf…
tolik0 Nov 23, 2023
d81ae88
🐛 Source Facebook Marketing: Removed validation that blocked personal…
tolik0 Nov 23, 2023
7037963
Update unit tests
Nov 24, 2023
4192042
Automated Commit - Formatting Changes
askarpets Nov 24, 2023
74d359d
Docs: Add permissions to prereqs in Source Facebook Marketing (#32653)
ChristoGrab Nov 24, 2023
0745c8e
✨Source Amazon Seller Partner: add new streams (#32738)
Nov 27, 2023
1fa3372
Merge branch 'source-amazon-seller-partner' into artem1205/source-ama…
Nov 27, 2023
3e69945
✨ Source Mailchimp: Implement SegmentMembers stream (#32782)
ChristoGrab Nov 27, 2023
4cdfc7a
Source My Hours: Update CDK (#32680)
ChristoGrab Nov 27, 2023
dd9a49e
Update unit tests
Nov 27, 2023
378cb75
Update changelog
Nov 27, 2023
58d846a
Automated Commit - Formatting Changes
askarpets Nov 27, 2023
b5d0ede
Source Cart: Update CDK to Latest Version (#32705)
pnilan Nov 27, 2023
86848a6
Source Twilio: Increase test coverage, fix parse_response bug, update…
pnilan Nov 27, 2023
1fda1df
Source Sendgrid: Increase Test Coverage, Update Expected Records (#32…
pnilan Nov 27, 2023
359e61a
Add descriptive error message on check connection failure
Nov 28, 2023
0c63e5e
Automated Commit - Formatting Changes
askarpets Nov 28, 2023
4f9688f
Update docs structure
Nov 28, 2023
177e067
Merge remote-tracking branch 'origin/artem1205/source-amazon-seller-p…
Nov 28, 2023
a660279
✨Source Amazon Seller Partner: multiple updates (#32833)
Nov 28, 2023
88b24cf
Merge remote-tracking branch 'origin/master' into dev
git-phu Nov 28, 2023
4ca273d
Add config migration
Nov 28, 2023
66a220b
Merge branch 'dev' into artem1205/source-amazon-seller-partner-spec-r…
Nov 28, 2023
bbac52f
Automated Commit - Formatting Changes
askarpets Nov 28, 2023
a8c8b58
Merge branch 'master' into artem1205/source-amazon-seller-partner-spe…
Nov 29, 2023
e070fa3
Merge remote-tracking branch 'origin/artem1205/source-amazon-seller-p…
Nov 29, 2023
3c4c1b3
Delete report_options section from spec
Nov 29, 2023
39f6885
Update migration
Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

from airbyte_cdk.entrypoint import launch
from source_amazon_seller_partner import SourceAmazonSellerPartner
from source_amazon_seller_partner.config_migrations import MigrateAccountType
from source_amazon_seller_partner.config_migrations import MigrateAccountType, MigrateReportOptions

if __name__ == "__main__":
source = SourceAmazonSellerPartner()
MigrateAccountType.migrate(sys.argv[1:], source)
MigrateReportOptions.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerImageTag: 2.4.0
dockerImageTag: 2.5.0
dockerRepository: airbyte/source-amazon-seller-partner
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
githubIssueLabel: source-amazon-seller-partner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import json
import logging
from typing import Any, List, Mapping

Expand Down Expand Up @@ -33,7 +33,7 @@ def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config requires migration.
Returns:
> True, if the transformation is neccessary
> True, if the transformation is necessary
> False, otherwise.
"""
return cls.migration_key not in config
Expand Down Expand Up @@ -64,7 +64,79 @@ def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
def migrate(cls, args: List[str], source: SourceAmazonSellerPartner) -> None:
"""
This method checks the input args, should the config be migrated,
transform if neccessary and emit the CONTROL message.
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(cls._modify_and_save(config_path, source, config))


class MigrateReportOptions:
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.

Specifically, starting from `2.0.1`, the `account_type` property becomes required.
For those connector configs that do not contain this key, the default value of `Seller` will be used.
Reverse operation is not needed as this field is ignored in previous versions of the connector.
"""

message_repository: MessageRepository = InMemoryMessageRepository()
migration_key: str = "report_options_list"

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config requires migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
return cls.migration_key not in config and (config.get("report_options") or config.get("advanced_stream_options"))

@classmethod
def _transform_report_options(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
try:
report_options = json.loads(config.get("report_options", "{}") or "{}")
except json.JSONDecodeError:
report_options = {}

report_options_list = []
for stream_name, options in report_options.items():
options_list = [{"option_name": name, "option_value": value} for name, value in options.items()]
report_options_list.append({"stream_name": stream_name, "options_list": options_list})

config[cls.migration_key] = report_options_list
return config

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceAmazonSellerPartner, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_report_options(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository.consume_queue():
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: SourceAmazonSellerPartner) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
Expand All @@ -74,6 +146,4 @@ def migrate(cls, args: List[str], source: SourceAmazonSellerPartner) -> None:
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)
cls._emit_control_message(cls._modify_and_save(config_path, source, config))
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from os import getenv
from typing import Any, List, Mapping, Tuple
from typing import Any, List, Mapping, Optional, Tuple

import pendulum
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from requests import HTTPError
from source_amazon_seller_partner.auth import AWSAuthenticator
from source_amazon_seller_partner.constants import get_marketplaces
from source_amazon_seller_partner.streams import (
Expand Down Expand Up @@ -65,6 +67,7 @@
VendorTrafficReport,
XmlAllOrdersDataByOrderDataGeneral,
)
from source_amazon_seller_partner.utils import AmazonConfigException


class SourceAmazonSellerPartner(AbstractSource):
Expand All @@ -90,9 +93,7 @@ def _get_stream_kwargs(config: Mapping[str, Any]) -> Mapping[str, Any]:
"replication_start_date": start_date,
"marketplace_id": marketplace_id,
"period_in_days": config.get("period_in_days", 90),
"report_options": config.get("report_options"),
"replication_end_date": config.get("replication_end_date"),
"advanced_stream_options": config.get("advanced_stream_options"),
}
return stream_kwargs

Expand All @@ -107,6 +108,8 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
Show error message in case of request exception or unexpected response.
"""
try:
self.validate_replication_dates(config)
self.validate_stream_report_options(config)
stream_kwargs = self._get_stream_kwargs(config)
orders_stream = Orders(**stream_kwargs)
next(orders_stream.read_records(sync_mode=SyncMode.full_refresh))
Expand All @@ -117,78 +120,111 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
if isinstance(e, StopIteration):
return True, None

# Additional check, since Vendor-only accounts within Amazon Seller API
# will not pass the test without this exception
# Additional check, since Vendor-only accounts within Amazon Seller API will not pass the test without this exception
if "403 Client Error" in str(e):
stream_to_check = VendorSalesReports(**stream_kwargs)
next(stream_to_check.read_records(sync_mode=SyncMode.full_refresh))
return True, None

return False, e
error_message = e.response.json().get("error_description") if isinstance(e, HTTPError) else e
return False, error_message

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
self.validate_stream_report_options(config)
streams = []
stream_kwargs = self._get_stream_kwargs(config)
streams = [
FbaCustomerReturnsReports(**stream_kwargs),
FbaAfnInventoryReports(**stream_kwargs),
FbaAfnInventoryByCountryReports(**stream_kwargs),
FbaOrdersReports(**stream_kwargs),
FbaShipmentsReports(**stream_kwargs),
FbaReplacementsReports(**stream_kwargs),
FbaStorageFeesReports(**stream_kwargs),
RestockInventoryReports(**stream_kwargs),
FlatFileActionableOrderDataShipping(**stream_kwargs),
FlatFileOpenListingsReports(**stream_kwargs),
FlatFileOrdersReports(**stream_kwargs),
FlatFileOrdersReportsByLastUpdate(**stream_kwargs),
FlatFileSettlementV2Reports(**stream_kwargs),
FulfilledShipmentsReports(**stream_kwargs),
MerchantListingsReports(**stream_kwargs),
VendorDirectFulfillmentShipping(**stream_kwargs),
Orders(**stream_kwargs),
OrderItems(**stream_kwargs),
OrderReportDataShipping(**stream_kwargs),
SellerFeedbackReports(**stream_kwargs),
GetXmlBrowseTreeData(**stream_kwargs),
ListFinancialEventGroups(**stream_kwargs),
ListFinancialEvents(**stream_kwargs),
LedgerDetailedViewReports(**stream_kwargs),
FbaEstimatedFbaFeesTxtReport(**stream_kwargs),
FbaFulfillmentCustomerShipmentPromotionReport(**stream_kwargs),
FbaMyiUnsuppressedInventoryReport(**stream_kwargs),
MerchantCancelledListingsReport(**stream_kwargs),
MerchantListingsReport(**stream_kwargs),
MerchantListingsReportBackCompat(**stream_kwargs),
MerchantListingsInactiveData(**stream_kwargs),
StrandedInventoryUiReport(**stream_kwargs),
XmlAllOrdersDataByOrderDataGeneral(**stream_kwargs),
MerchantListingsFypReport(**stream_kwargs),
FbaSnsForecastReport(**stream_kwargs),
FbaSnsPerformanceReport(**stream_kwargs),
FlatFileArchivedOrdersDataByOrderDate(**stream_kwargs),
FlatFileReturnsDataByReturnDate(**stream_kwargs),
FbaInventoryPlaningReport(**stream_kwargs),
LedgerSummaryViewReport(**stream_kwargs),
FbaReimbursementsReports(**stream_kwargs),
stream_list = [
FbaCustomerReturnsReports,
FbaAfnInventoryReports,
FbaAfnInventoryByCountryReports,
FbaOrdersReports,
FbaShipmentsReports,
FbaReplacementsReports,
FbaStorageFeesReports,
RestockInventoryReports,
FlatFileActionableOrderDataShipping,
FlatFileOpenListingsReports,
FlatFileOrdersReports,
FlatFileOrdersReportsByLastUpdate,
FlatFileSettlementV2Reports,
FulfilledShipmentsReports,
MerchantListingsReports,
VendorDirectFulfillmentShipping,
Orders,
OrderItems,
OrderReportDataShipping,
SellerFeedbackReports,
GetXmlBrowseTreeData,
ListFinancialEventGroups,
ListFinancialEvents,
LedgerDetailedViewReports,
FbaEstimatedFbaFeesTxtReport,
FbaFulfillmentCustomerShipmentPromotionReport,
FbaMyiUnsuppressedInventoryReport,
MerchantCancelledListingsReport,
MerchantListingsReport,
MerchantListingsReportBackCompat,
MerchantListingsInactiveData,
StrandedInventoryUiReport,
XmlAllOrdersDataByOrderDataGeneral,
MerchantListingsFypReport,
FbaSnsForecastReport,
FbaSnsPerformanceReport,
FlatFileArchivedOrdersDataByOrderDate,
FlatFileReturnsDataByReturnDate,
FbaInventoryPlaningReport,
LedgerSummaryViewReport,
FbaReimbursementsReports,
]
# TODO: Remove after Brand Analytics will be enabled in CLOUD:
# https://github.com/airbytehq/airbyte/issues/32353

# TODO: Remove after Brand Analytics will be enabled in CLOUD: https://github.com/airbytehq/airbyte/issues/32353
if getenv("DEPLOYMENT_MODE", "").upper() != "CLOUD":
brand_analytics_reports = [
BrandAnalyticsMarketBasketReports(**stream_kwargs),
BrandAnalyticsSearchTermsReports(**stream_kwargs),
BrandAnalyticsRepeatPurchaseReports(**stream_kwargs),
BrandAnalyticsAlternatePurchaseReports(**stream_kwargs),
BrandAnalyticsItemComparisonReports(**stream_kwargs),
SellerAnalyticsSalesAndTrafficReports(**stream_kwargs),
VendorSalesReports(**stream_kwargs),
VendorInventoryReports(**stream_kwargs),
NetPureProductMarginReport(**stream_kwargs),
RapidRetailAnalyticsInventoryReport(**stream_kwargs),
VendorTrafficReport(**stream_kwargs),
BrandAnalyticsMarketBasketReports,
BrandAnalyticsSearchTermsReports,
BrandAnalyticsRepeatPurchaseReports,
BrandAnalyticsAlternatePurchaseReports,
BrandAnalyticsItemComparisonReports,
SellerAnalyticsSalesAndTrafficReports,
VendorSalesReports,
VendorInventoryReports,
NetPureProductMarginReport,
RapidRetailAnalyticsInventoryReport,
VendorTrafficReport,
]
streams += brand_analytics_reports
stream_list += brand_analytics_reports

for stream in stream_list:
streams.append(stream(**stream_kwargs, report_options=self.get_stream_report_options_list(stream.name, config)))
return streams

@staticmethod
def validate_replication_dates(config: Mapping[str, Any]) -> None:
if (
"replication_start_date" in config
and "replication_end_date" in config
and config["replication_end_date"] < config["replication_start_date"]
):
raise AmazonConfigException(message="End Date should be greater than or equal to Start Date")

@staticmethod
def validate_stream_report_options(config: Mapping[str, Any]) -> None:
if len([x.get("stream_name") for x in config.get("report_options_list", [])]) != len(
set(x.get("stream_name") for x in config.get("report_options_list", []))
):
raise AmazonConfigException(message="Stream name should be unique among all Report options list")
for stream_report_option in config.get("report_options_list", []):
if len([x.get("option_name") for x in stream_report_option.get("options_list")]) != len(
set(x.get("option_name") for x in stream_report_option.get("options_list"))
):
raise AmazonConfigException(
message=f"Option names should be unique for `{stream_report_option.get('stream_name')}` report options"
)

@staticmethod
def get_stream_report_options_list(report_name: str, config: Mapping[str, Any]) -> Optional[List[Mapping[str, Any]]]:
if any(x for x in config.get("report_options_list", []) if x.get("stream_name") == report_name):
return [x.get("options_list") for x in config.get("report_options_list") if x.get("stream_name") == report_name][0]
Loading
Loading