Skip to content

Commit

Permalink
🐛 Source Amazon Seller Partner: Fix check command to check access to …
Browse files Browse the repository at this point in the history
…correct streams (#35062)
  • Loading branch information
tolik0 authored and xiaohansong committed Feb 27, 2024
1 parent 5e2b7e0 commit 10e7f67
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerImageTag: 3.2.2
dockerImageTag: 3.3.0
dockerRepository: airbyte/source-amazon-seller-partner
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
githubIssueLabel: source-amazon-seller-partner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,20 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
self.validate_replication_dates(config)
self.validate_stream_report_options(config)
stream_kwargs = self._get_stream_kwargs(config)
orders_stream = Orders(**stream_kwargs)
next(orders_stream.read_records(sync_mode=SyncMode.full_refresh))

if config.get("account_type", "Seller") == "Seller":
stream_to_check = Orders(**stream_kwargs)
else:
stream_to_check = VendorSalesReports(**stream_kwargs)

next(stream_to_check.read_records(sync_mode=SyncMode.full_refresh))

return True, None
except Exception as e:
# Validate Orders stream without data
# Validate stream without data
if isinstance(e, StopIteration):
return True, None

# Additional check, since Vendor-only accounts within Amazon Seller API will not pass the test without this exception
if "403 Client Error" in str(e):
stream_to_check = VendorSalesReports(**stream_kwargs)
next(stream_to_check.read_records(sync_mode=SyncMode.full_refresh))
return True, None

error_message = e.response.json().get("error_description") if isinstance(e, HTTPError) else e
return False, error_message

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:

start_date = pendulum.parse(self._replication_start_date)
end_date = pendulum.now("utc").subtract(days=self.availability_sla_days)

Expand Down Expand Up @@ -1260,7 +1259,6 @@ def _create_report(
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Mapping[str, Any]:

# For backwards
return {"reportId": stream_slice.get("report_id")}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_augmented_data_incorrect_period(self, report_init_kwargs):
report_options = {"reportPeriod": "DAYS123"}
with pytest.raises(Exception) as e:
stream._augmented_data(report_options)
assert e.value.args[0] == [{'message': 'This reportPeriod is not implemented.'}]
assert e.value.args[0] == [{"message": "This reportPeriod is not implemented."}]

@pytest.mark.parametrize(
("report_options", "report_option_dates"),
Expand Down Expand Up @@ -90,9 +90,10 @@ def test_report_data(self, report_init_kwargs, stream_slice):
stream = SomeIncrementalAnalyticsStream(**report_init_kwargs)
expected_data = {"reportType": stream.name, "marketplaceIds": [report_init_kwargs["marketplace_id"]]}
expected_data.update(stream_slice)
assert stream._report_data(
sync_mode=SyncMode.incremental, cursor_field=[stream.cursor_field], stream_slice=stream_slice
) == expected_data
assert (
stream._report_data(sync_mode=SyncMode.incremental, cursor_field=[stream.cursor_field], stream_slice=stream_slice)
== expected_data
)

@pytest.mark.parametrize(
("current_stream_state", "latest_record", "expected_date"),
Expand Down Expand Up @@ -143,6 +144,7 @@ def test_stream_slices(self, report_init_kwargs, start_date, end_date, stream_st
stream = SomeIncrementalAnalyticsStream(**report_init_kwargs)
stream.fixed_period_in_days = fixed_period_in_days
with patch("pendulum.now", return_value=pendulum.parse("2023-09-09T00:00:00Z")):
assert list(
stream.stream_slices(sync_mode=SyncMode.incremental, cursor_field=[stream.cursor_field], stream_state=stream_state)
) == expected_slices
assert (
list(stream.stream_slices(sync_mode=SyncMode.incremental, cursor_field=[stream.cursor_field], stream_state=stream_state))
== expected_slices
)
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TestMigrateReportOptions:
("input_config", "expected_report_options_list"),
(
(
{"report_options": "{\"GET_REPORT\": {\"reportPeriod\": \"WEEK\"}}"},
{"report_options": '{"GET_REPORT": {"reportPeriod": "WEEK"}}'},
[{"stream_name": "GET_REPORT", "options_list": [{"option_name": "reportPeriod", "option_value": "WEEK"}]}],
),
({"report_options": None}, []),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@ def connector_config_with_report_options():
"lwa_app_id": "amzn1.application-oa2-client.abc123",
"lwa_client_secret": "abc123",
"aws_environment": "SANDBOX",
"account_type": "Seller",
"region": "US",
"report_options_list": [
{
"stream_name": "GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA",
"options_list": [
{"option_name": "some_name_1", "option_value": "some_value_1"},
{"option_name": "some_name_2", "option_value": "some_value_2"},
],
},
],
}


@pytest.fixture
def connector_vendor_config_with_report_options():
return {
"replication_start_date": "2017-01-25T00:00:00Z",
"replication_end_date": "2017-02-25T00:00:00Z",
"refresh_token": "Atzr|IwEBIP-abc123",
"lwa_app_id": "amzn1.application-oa2-client.abc123",
"lwa_client_secret": "abc123",
"aws_environment": "SANDBOX",
"account_type": "Vendor",
"region": "US",
"report_options_list": [
{
Expand All @@ -48,23 +72,17 @@ def connector_config_without_start_date():
}


def test_check_connection_with_vendor_report(mocker, requests_mock, connector_config_with_report_options):
def test_check_connection_with_vendor_report(mocker, requests_mock, connector_vendor_config_with_report_options):
mocker.patch("time.sleep", lambda x: None)
requests_mock.register_uri(
"POST",
"https://api.amazon.com/auth/o2/token",
status_code=200,
json={"access_token": "access_token", "expires_in": "3600"},
)
requests_mock.register_uri(
"GET",
"https://sandbox.sellingpartnerapi-na.amazon.com/orders/v0/orders",
status_code=403,
json={"error": "forbidden"},
)

with patch.object(VendorSalesReports, "read_records", return_value=iter([{"some_key": "some_value"}])):
assert SourceAmazonSellerPartner().check_connection(logger, connector_config_with_report_options) == (True, None)
assert SourceAmazonSellerPartner().check_connection(logger, connector_vendor_config_with_report_options) == (True, None)


def test_check_connection_with_orders_stop_iteration(requests_mock, connector_config_with_report_options):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_report_data(self, report_init_kwargs):
None,
[
{"dataStartTime": "2021-10-01T00:00:00Z", "dataEndTime": "2022-09-30T23:59:59Z"},
{"dataStartTime": "2022-10-01T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"}
{"dataStartTime": "2022-10-01T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"},
],
),
(
Expand Down Expand Up @@ -129,8 +129,10 @@ def test_read_records_retrieve_fatal(self, report_init_kwargs, mocker, requests_
stream_start = "2022-09-03T00:00:00Z"
stream_end = "2022-10-03T00:00:00Z"
with pytest.raises(AirbyteTracedException) as e:
list(stream.read_records(
sync_mode=SyncMode.full_refresh, stream_slice={"dataStartTime": stream_start, "dataEndTime": stream_end})
list(
stream.read_records(
sync_mode=SyncMode.full_refresh, stream_slice={"dataStartTime": stream_start, "dataEndTime": stream_end}
)
)
assert e.value.internal_message == (
f"Failed to retrieve the report 'GET_TEST_REPORT' for period {stream_start}-{stream_end} "
Expand Down Expand Up @@ -210,11 +212,7 @@ def test_read_records_retrieve_forbidden(self, report_init_kwargs, mocker, reque

report_id = "some_report_id"
requests_mock.register_uri(
"POST",
"https://test.url/reports/2021-06-30/reports",
status_code=403,
json={"reportId": report_id},
reason="Forbidden"
"POST", "https://test.url/reports/2021-06-30/reports", status_code=403, json={"reportId": report_id}, reason="Forbidden"
)

stream = SomeReportStream(**report_init_kwargs)
Expand Down
Loading

0 comments on commit 10e7f67

Please sign in to comment.