Skip to content

Commit

Permalink
Fix Amazon Seller Partner Connector Formatting. (#3780)
Browse files Browse the repository at this point in the history
* fix formatting

* Correct imports.

* Format.

Co-authored-by: Davin Chia <davinchia@gmail.com>
  • Loading branch information
subodh1810 and davinchia committed Jun 1, 2021
1 parent 430471b commit 06a803d
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
Expand All @@ -19,6 +20,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


import pytest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
Expand All @@ -19,6 +20,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


import sys
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
Expand All @@ -19,14 +20,12 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"python-amazon-sp-api"
]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "python-amazon-sp-api"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
from sp_api.api import Reports, Orders
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from sp_api.api import Orders, Reports
from sp_api.base import Marketplaces


Expand All @@ -24,15 +48,12 @@ class AmazonClient:
"Turkey": Marketplaces.TR,
"UAE": Marketplaces.AE,
"UK": Marketplaces.UK,
"USA": Marketplaces.US
"USA": Marketplaces.US,
}

GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL = "GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"
ORDERS = "Orders"
CURSORS = {
GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL: "purchase-date",
ORDERS: "LastUpdateDate"
}
CURSORS = {GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL: "purchase-date", ORDERS: "LastUpdateDate"}

_REPORT_ENTITIES = [GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL]
_OTHER_ENTITIES = [ORDERS]
Expand All @@ -56,21 +77,21 @@ def get_cursor_for_stream(self, stream_name: str) -> str:
def fetch_orders(self, updated_after: str, page_size: int, next_token: str) -> any:
page_count = page_size or self.PAGECOUNT
response = Orders(credentials=self.credentials, marketplace=self.marketplace).get_orders(
LastUpdatedAfter=updated_after, MaxResultsPerPage=page_count, NextToken=next_token)
LastUpdatedAfter=updated_after, MaxResultsPerPage=page_count, NextToken=next_token
)
return response.payload

def request_report(self, report_type: str, data_start_time: str, data_end_time: str) -> any:
response = Reports(credentials=self.credentials, marketplace=self.marketplace).create_report(
reportType=report_type, dataStartTime=data_start_time, dataEndTime=data_end_time)
reportType=report_type, dataStartTime=data_start_time, dataEndTime=data_end_time
)

return response.payload

def get_report(self, report_id: str):
response = Reports(credentials=self.credentials,
marketplace=Marketplaces.IN).get_report(report_id=report_id)
response = Reports(credentials=self.credentials, marketplace=Marketplaces.IN).get_report(report_id=report_id)
return response.payload

def get_report_document(self, report_document_id: str):
response = Reports(credentials=self.credentials, marketplace=Marketplaces.IN).get_report_document(
report_document_id, decrypt=True)
response = Reports(credentials=self.credentials, marketplace=Marketplaces.IN).get_report_document(report_document_id, decrypt=True)
return response.payload
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import json
import time
import pkgutil
import dateutil
from datetime import datetime, timedelta, date
from dateutil.relativedelta import *
import time
from datetime import date, datetime, timedelta
from typing import DefaultDict, Dict, Generator

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteStream, AirbyteMessage, AirbyteStateMessage, AirbyteRecordMessage, Type

from sp_api.base import Marketplaces
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, AirbyteStream, Type
from dateutil import parser
from dateutil.relativedelta import relativedelta

from .amazon import AmazonClient

Expand All @@ -18,8 +40,17 @@ class BaseClient:
MAX_SLEEP_TIME = 512
CONVERSION_WINDOW_DAYS = 14

def __init__(self, refresh_token: str, lwa_app_id: str, lwa_client_secret: str, aws_secret_key: str, aws_access_key: str, role_arn: str,
start_date: str, marketplace: str = "USA"):
def __init__(
self,
refresh_token: str,
lwa_app_id: str,
lwa_client_secret: str,
aws_secret_key: str,
aws_access_key: str,
role_arn: str,
start_date: str,
marketplace: str = "USA",
):
self.credentials = dict(
refresh_token=refresh_token,
lwa_app_id=lwa_app_id,
Expand All @@ -29,27 +60,22 @@ def __init__(self, refresh_token: str, lwa_app_id: str, lwa_client_secret: str,
role_arn=role_arn,
)
self.start_date = start_date
self._amazon_client = AmazonClient(
credentials=self.credentials, marketplace=marketplace)
self._amazon_client = AmazonClient(credentials=self.credentials, marketplace=marketplace)

def check_connection(self):
updated_after = (
datetime.utcnow() - timedelta(days=self.CONVERSION_WINDOW_DAYS)).isoformat()
updated_after = (datetime.utcnow() - timedelta(days=self.CONVERSION_WINDOW_DAYS)).isoformat()
return self._amazon_client.fetch_orders(updated_after, 10, None)

def get_streams(self):
streams = []
for entity in self._amazon_client.get_entities():
raw_schema = json.loads(pkgutil.get_data(
self.__class__.__module__.split(".")[0], f"schemas/{entity}.json"))
raw_schema = json.loads(pkgutil.get_data(self.__class__.__module__.split(".")[0], f"schemas/{entity}.json"))
streams.append(AirbyteStream.parse_obj(raw_schema))
return streams

def read_stream(self, logger: AirbyteLogger, stream_name: str, state: DefaultDict[str, any]) -> Generator[AirbyteMessage, None, None]:
cursor_field = self._amazon_client.get_cursor_for_stream(
stream_name)
cursor_value = self._get_cursor_or_none(
state, stream_name, cursor_field) or self.start_date
cursor_field = self._amazon_client.get_cursor_for_stream(stream_name)
cursor_value = self._get_cursor_or_none(state, stream_name, cursor_field) or self.start_date

if cursor_value > date.today().isoformat():
state[stream_name][cursor_field] = date.today().isoformat()
Expand All @@ -64,32 +90,27 @@ def read_stream(self, logger: AirbyteLogger, stream_name: str, state: DefaultDic
PAGE = 1
while HAS_NEXT:
logger.info(f"Pulling for page: {PAGE}")
response = self._amazon_client.fetch_orders(
current_date, self._amazon_client.PAGECOUNT, NEXT_TOKEN)
response = self._amazon_client.fetch_orders(current_date, self._amazon_client.PAGECOUNT, NEXT_TOKEN)
orders = response["Orders"]
if "NextToken" in response:
NEXT_TOKEN = response["NextToken"]
HAS_NEXT = True if NEXT_TOKEN else False
PAGE = PAGE + 1
for order in orders:
current_date = dateutil.parser.parse(
order[cursor_field]).date().isoformat()
cursor_value = max(
current_date, cursor_value) if cursor_value else current_date
current_date = parser.parse(order[cursor_field]).date().isoformat()
cursor_value = max(current_date, cursor_value) if cursor_value else current_date
yield self._record(stream=stream_name, data=order)

if cursor_value:
state[stream_name][cursor_field] = (date.fromisoformat(
cursor_value) + relativedelta(days=1)).isoformat()
state[stream_name][cursor_field] = (date.fromisoformat(cursor_value) + relativedelta(days=1)).isoformat()
yield self._state(state)

# Sleep for 2 seconds
time.sleep(2)

def read_reports(self, logger: AirbyteLogger, stream_name: str, state: DefaultDict[str, any]) -> Generator[AirbyteMessage, None, None]:
cursor_field = self._amazon_client.get_cursor_for_stream(stream_name)
cursor_value = self._get_cursor_or_none(
state, stream_name, cursor_field) or self.start_date
cursor_value = self._get_cursor_or_none(state, stream_name, cursor_field) or self.start_date

if cursor_value > date.today().isoformat():
state[stream_name][cursor_field] = date.today().isoformat()
Expand All @@ -104,28 +125,23 @@ def read_reports(self, logger: AirbyteLogger, stream_name: str, state: DefaultDi

# Request for the report
logger.info(f"Requested report from {start_date} to {end_date}")
response = self._amazon_client.request_report(
stream_name, start_date, end_date)
response = self._amazon_client.request_report(stream_name, start_date, end_date)
reportId = response["reportId"]

# Wait for the report status
document_id = self._wait_for_report(logger,
self._amazon_client, reportId)
document_id = self._wait_for_report(logger, self._amazon_client, reportId)

# Pull data for a report
data = self._amazon_client.get_report_document(document_id)

# Loop through all records and yield
for row in self._get_records(data):
current_cursor_value = datetime.fromisoformat(
row[cursor_field]).date().isoformat()
cursor_value = max(
current_cursor_value, cursor_value) if cursor_value else current_cursor_value
current_cursor_value = datetime.fromisoformat(row[cursor_field]).date().isoformat()
cursor_value = max(current_cursor_value, cursor_value) if cursor_value else current_cursor_value
yield self._record(stream=stream_name, data=row)

if cursor_value:
state[stream_name][cursor_field] = self._get_cursor_state(
cursor_value, end_date)
state[stream_name][cursor_field] = self._get_cursor_state(cursor_value, end_date)
yield self._state(state)

current_date = self._increase_date_by_month(current_date)
Expand All @@ -143,11 +159,11 @@ def _wait_for_report(self, logger, amazon_client: AmazonClient, reportId: str):
while True:
response = amazon_client.get_report(reportId)
if response["processingStatus"] == "DONE":
logger.info(f"Report status: DONE")
logger.info("Report status: DONE")
document_id = response["reportDocumentId"]
return document_id
if current_sleep_time > self.MAX_SLEEP_TIME:
logger.error(f"Max wait reached")
logger.error("Max wait reached")
raise Exception("Max wait time reached")

logger.info(f"Sleeping for {current_sleep_time}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
Expand All @@ -19,10 +20,11 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


import json
from typing import Dict, Generator, Mapping, DefaultDict
from typing import DefaultDict, Dict, Generator, Mapping

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
Expand All @@ -32,7 +34,7 @@
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
SyncMode
SyncMode,
)
from airbyte_cdk.sources import Source

Expand All @@ -49,7 +51,7 @@ def _get_client(self, config: Mapping):

def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
client = self._get_client(config)
logger.info(f"Checking access to Amazon SP-API")
logger.info("Checking access to Amazon SP-API")
try:
client.check_connection()
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Expand Down
Loading

0 comments on commit 06a803d

Please sign in to comment.