Skip to content

Commit

Permalink
🐛 Source Amazon Seller Partner: handle start date for financial stream (
Browse files Browse the repository at this point in the history
#13633)

* start and end date for finacial stream should not be more than 180 days apart

* improve unit tests

* make changes to start date for finance stream

* update tests

* lint changes

* update version to 0.2.22 for source-amazon-seller-partner
  • Loading branch information
ganpatagarwal committed Jun 18, 2022
1 parent 5852989 commit b338014
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
- name: Amazon Seller Partner
sourceDefinitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerRepository: airbyte/source-amazon-seller-partner
dockerImageTag: 0.2.21
dockerImageTag: 0.2.22
sourceType: api
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-seller-partner
icon: amazonsellerpartner.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@
type: "string"
path_in_connector_config:
- "client_secret"
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.21"
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.22"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
changelogUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.21
LABEL io.airbyte.version=0.2.22
LABEL io.airbyte.name=airbyte/source-amazon-seller-partner
Original file line number Diff line number Diff line change
Expand Up @@ -764,14 +764,24 @@ def request_params(
if next_page_token:
return dict(next_page_token)

params = {self.replication_start_date_field: self._replication_start_date, self.page_size_field: self.page_size}

# for finance APIs, end date-time must be no later than two minutes before the request was submitted
end_date = pendulum.now("utc").subtract(minutes=2, seconds=10).strftime(DATE_TIME_FORMAT)
if self._replication_end_date:
end_date = self._replication_end_date

params[self.replication_end_date_field] = end_date
# start date and end date should not be more than 180 days apart.
start_date = max(pendulum.parse(self._replication_start_date), pendulum.parse(end_date).subtract(days=180)).strftime(
DATE_TIME_FORMAT
)

# logging to make sure user knows taken start date
logger.info("start date used: %s", start_date)

params = {
self.replication_start_date_field: start_date,
self.replication_end_date_field: end_date,
self.page_size_field: self.page_size,
}
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pendulum
import pytest
import requests
from source_amazon_seller_partner.auth import AWSSignature
Expand Down Expand Up @@ -83,100 +84,134 @@
}
}

DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

START_DATE_1 = "2022-05-25T00:00:00Z"
END_DATE_1 = "2022-05-26T00:00:00Z"

START_DATE_2 = "2021-01-01T00:00:00Z"
END_DATE_2 = "2022-07-31T00:00:00Z"


@pytest.fixture
def list_financial_event_groups_stream():
aws_signature = AWSSignature(
service="execute-api",
aws_access_key_id="AccessKeyId",
aws_secret_access_key="SecretAccessKey",
aws_session_token="SessionToken",
region="US",
)
stream = ListFinancialEventGroups(
url_base="https://test.url",
aws_signature=aws_signature,
replication_start_date="2022-05-25T00:00:00Z",
replication_end_date="2022-05-26T00:00:00Z",
marketplace_id="id",
authenticator=None,
period_in_days=0,
report_options=None,
max_wait_seconds=500,
)
return stream
def _internal(start_date: str = START_DATE_1, end_date: str = END_DATE_1):
aws_signature = AWSSignature(
service="execute-api",
aws_access_key_id="AccessKeyId",
aws_secret_access_key="SecretAccessKey",
aws_session_token="SessionToken",
region="US",
)
stream = ListFinancialEventGroups(
url_base="https://test.url",
aws_signature=aws_signature,
replication_start_date=start_date,
replication_end_date=end_date,
marketplace_id="id",
authenticator=None,
period_in_days=0,
report_options=None,
max_wait_seconds=500,
)
return stream

return _internal


@pytest.fixture
def list_financial_events_stream():
aws_signature = AWSSignature(
service="execute-api",
aws_access_key_id="AccessKeyId",
aws_secret_access_key="SecretAccessKey",
aws_session_token="SessionToken",
region="US",
)
stream = ListFinancialEvents(
url_base="https://test.url",
aws_signature=aws_signature,
replication_start_date="2022-05-25T00:00:00Z",
replication_end_date="2022-05-26T00:00:00Z",
marketplace_id="id",
authenticator=None,
period_in_days=0,
report_options=None,
max_wait_seconds=500,
)
return stream
def _internal(start_date: str = START_DATE_1, end_date: str = END_DATE_1):
aws_signature = AWSSignature(
service="execute-api",
aws_access_key_id="AccessKeyId",
aws_secret_access_key="SecretAccessKey",
aws_session_token="SessionToken",
region="US",
)
stream = ListFinancialEvents(
url_base="https://test.url",
aws_signature=aws_signature,
replication_start_date=start_date,
replication_end_date=end_date,
marketplace_id="id",
authenticator=None,
period_in_days=0,
report_options=None,
max_wait_seconds=500,
)
return stream

return _internal


def test_finance_stream_next_token(mocker, list_financial_event_groups_stream):
response = requests.Response()
token = "aabbccddeeff"
expected = {"NextToken": token}
mocker.patch.object(response, "json", return_value={"payload": expected})
assert expected == list_financial_event_groups_stream.next_page_token(response)
assert expected == list_financial_event_groups_stream().next_page_token(response)

mocker.patch.object(response, "json", return_value={"payload": {}})
if list_financial_event_groups_stream.next_page_token(response) is not None:
if list_financial_event_groups_stream().next_page_token(response) is not None:
assert False


def test_financial_event_groups_stream_request_params(list_financial_event_groups_stream):
params = {
"FinancialEventGroupStartedAfter": "2022-05-25T00:00:00Z",
# test 1
expected_params = {
"FinancialEventGroupStartedAfter": START_DATE_1,
"MaxResultsPerPage": 100,
"FinancialEventGroupStartedBefore": "2022-05-26T00:00:00Z",
"FinancialEventGroupStartedBefore": END_DATE_1,
}
assert params == list_financial_event_groups_stream.request_params({}, None)
assert expected_params == list_financial_event_groups_stream().request_params({}, None)

# test 2
token = "aabbccddeeff"
params = {"NextToken": token}
assert params == list_financial_event_groups_stream.request_params({}, {"NextToken": token})
expected_params = {"NextToken": token}
assert expected_params == list_financial_event_groups_stream().request_params({}, {"NextToken": token})

# test 3 - for 180 days limit
expected_params = {
"FinancialEventGroupStartedAfter": pendulum.parse(END_DATE_2).subtract(days=180).strftime(DATE_TIME_FORMAT),
"MaxResultsPerPage": 100,
"FinancialEventGroupStartedBefore": END_DATE_2,
}
assert expected_params == list_financial_event_groups_stream(START_DATE_2, END_DATE_2).request_params({}, None)


def test_financial_event_groups_stream_parse_response(mocker, list_financial_event_groups_stream):
response = requests.Response()
mocker.patch.object(response, "json", return_value=list_financial_event_groups_data)

for record in list_financial_event_groups_stream.parse_response(response, {}):
for record in list_financial_event_groups_stream().parse_response(response, {}):
assert record == list_financial_event_groups_data.get("payload").get("FinancialEventGroupList")[0]


def test_financial_events_stream_request_params(list_financial_events_stream):
params = {"PostedAfter": "2022-05-25T00:00:00Z", "MaxResultsPerPage": 100, "PostedBefore": "2022-05-26T00:00:00Z"}
assert params == list_financial_events_stream.request_params({}, None)
# test 1
expected_params = {"PostedAfter": START_DATE_1, "MaxResultsPerPage": 100, "PostedBefore": END_DATE_1}
assert expected_params == list_financial_events_stream().request_params({}, None)

# test 2
token = "aabbccddeeff"
params = {"NextToken": token}
assert params == list_financial_events_stream.request_params({}, {"NextToken": token})
expected_params = {"NextToken": token}
assert expected_params == list_financial_events_stream().request_params({}, {"NextToken": token})

# test 3 - for 180 days limit
expected_params = {
"PostedAfter": pendulum.parse(END_DATE_2).subtract(days=180).strftime(DATE_TIME_FORMAT),
"MaxResultsPerPage": 100,
"PostedBefore": END_DATE_2,
}
assert expected_params == list_financial_events_stream(START_DATE_2, END_DATE_2).request_params({}, None)


def test_financial_events_stream_parse_response(mocker, list_financial_events_stream):
response = requests.Response()
mocker.patch.object(response, "json", return_value=list_financial_events_data)

for record in list_financial_events_stream.parse_response(response, {}):
for record in list_financial_events_stream().parse_response(response, {}):
assert list_financial_events_data.get("payload").get("FinancialEvents").get("ShipmentEventList") == record.get("ShipmentEventList")
assert list_financial_events_data.get("payload").get("FinancialEvents").get("RefundEventList") == record.get("RefundEventList")
assert list_financial_events_data.get("payload").get("FinancialEvents").get("AdjustmentEventList") == record.get(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/amazon-seller-partner.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ This source is capable of syncing the following tables and their data:

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------|
| `0.2.22` | 2022-06-15 | [\#13633](https://github.com/airbytehq/airbyte/pull/13633) | Fix - handle start date for financial stream |
| `0.2.21` | 2022-06-01 | [\#13364](https://github.com/airbytehq/airbyte/pull/13364) | Add financial streams |
| `0.2.20` | 2022-05-30 | [\#13059](https://github.com/airbytehq/airbyte/pull/13059) | Add replication end date to config |
| `0.2.19` | 2022-05-24 | [\#13119](https://github.com/airbytehq/airbyte/pull/13119) | Add OAuth2.0 support |
Expand Down

0 comments on commit b338014

Please sign in to comment.