In [1]:
import json
import pyodbc
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, select, MetaData, Table
import requests
import sqlalchemy as sa
import urllib
import sql_scripts as ss
from datetime import date, datetime, timedelta
from threading import Thread

In [2]:
def getAuthforWMS(f_data):
    wmsAccess = f_data["wmsAccess"][0]
    return wmsAccess

In [3]:
#gets connections
def getConnforMYSQL(f_data, accessType):
    dialect = pyodbc.drivers()[-1]
    server = f_data[accessType][0]["server"]
    db = f_data[accessType][0]["database"]
    uid = f_data[accessType][0]["uid"]
    pwd = f_data[accessType][0]["pwd"]
    driver = f_data[accessType][0]["dialect_driver"]
    port = f_data[accessType][0]["port"]

    if accessType == "azureAccess":
        connection_string = (
            " Driver={%s}" %dialect +
            "; SERVER=%s" %server + 
            "; Database=%s " %db + 
            "; UID=%s" %uid +
            "; PWD=%s" %pwd
        )
        quoted = urllib.parse.quote_plus(connection_string)
        quoted = f_data[accessType][0]["dialect_driver"] + quoted
        engine = create_engine(quoted, fast_executemany=True).execution_options(isolation_level="AUTOCOMMIT")
    else:
        quoted = driver + uid + ":" + pwd + "@" + server + ":" + str(port) + "/" + db
        engine = create_engine(quoted).execution_options(isolation_level="AUTOCOMMIT")
        
    return engine

In [4]:
#Extracts data from DB
def extractionFunction(conn_integrator, sql_text):
    with conn_integrator.begin():
        str_sql = sa.text(sql_text)
        results = conn_integrator.execute(str_sql)
        columns = results.keys()

        list_columns = []
        list_rows = []
        #print column_names
        for column_name in columns:
            list_columns += [column_name]
        for row in results:
            list_rows += [row]

    return list_rows, list_columns

In [5]:
#get response from API
def setupAPIrequest(schemeHTTP, baseHTTP, extraHTTP, headers):
    #adds default headers
    headers['Accept'] =  "application/json"
    headers['Content-Type'] =  "application/json"   

    completeHTTP = schemeHTTP + baseHTTP + extraHTTP
    response = requests.get(completeHTTP, headers=headers)
    
    return response

In [6]:
class threadMain:
    def __init__(self, index, columns_accountList, rows_accountList, file):
        self.subThreads = []
        self.file = file
        self.index = index
        self.columns_accountList = columns_accountList
        self.rows_accountList = rows_accountList

        self.t = Thread(target=self.threadMainTask, args=( ))
        self.t.start()

    def getThread(self):
        return (self.t)

    def threadSubTask(self, pageNumber, schemeHTTP, baseHTTP, headers, yesterday, now):
        extraHTTP = "/api/oms/pvt/orders?per_page=100&page=%s&f_creationDate=creationDate:[%s TO %s]" %(pageNumber, yesterday, now)
        response = setupAPIrequest(schemeHTTP, baseHTTP, extraHTTP, headers)
        
        if "list" in response.json():
            globals()["dataList"] = globals()["dataList"] + response.json()["list"]
        else:
            print (response.json())
        
    def threadMainTask(self):
        #redeclare self variables
        index = self.index
        columns_accountList = self.columns_accountList
        rows_accountList = self.rows_accountList
        subThreads = self.subThreads
        #begin a MainThread
        print(f'Starting Thread index: {index}')
        
        for column_index in range(len(columns_accountList)):
            if columns_accountList[column_index] == "CONTA":
                account = rows_accountList[index][column_index]
            if columns_accountList[column_index] == "CHAVE":
                key = rows_accountList[index][column_index]
            if columns_accountList[column_index] == "TOKEN":
                token = rows_accountList[index][column_index]

        now = datetime.now().strftime(ss.queryList[self.file]['nowStrftime'])
        yesterday = (datetime.now() - timedelta(days=ss.queryList[self.file]['timedelta'])).strftime(ss.queryList[self.file]['yesterdayStrftime'])
        
        schemeHTTP = "https://"
        baseHTTP = "%s.%s.com.br" % (account, "vtexcommercestable")
        headers = {                
                'X-VTEX-API-AppKey': key,
                'X-VTEX-API-AppToken': token
                }
        pageNumber = 1
        extraHTTP = "/api/oms/pvt/orders?per_page=100&page=%s&f_creationDate=creationDate:[%s TO %s]" %(pageNumber, yesterday, now)

        #get first response
        response = setupAPIrequest(schemeHTTP, baseHTTP, extraHTTP, headers)

        print("response index: %s status_code: %s" %(self.index, response.status_code))
        #good Responses
        if response.status_code == 200:
            #include initial dataDict
            globals()["dataList"] = globals()["dataList"] + response.json()["list"]

            #get paging info
            for pageNumber in range(response.json()["paging"]["pages"]):
                #call subThread
                if pageNumber > 1:
                    print(f'Starting subThread index: {index} pageNumber: {pageNumber}')
                    st = Thread(target=self.threadSubTask, args=(pageNumber, schemeHTTP, baseHTTP, headers, yesterday, now,))
                    subThreads.append(st)
                    st.start()

            for st in subThreads:
                st.join()
        #Bad Responses
        else:
            globals()["errorList"].append({'account': account, 'status_code': response.status_code, 'time': now })
    

In [7]:
def main (file):
    #open auth file
    auth = open('auth.json')
    auth_load = json.load(auth)

    #getting conn for azure
    engine_azure = getConnforMYSQL(auth_load, "azureAccess")
    conn_azure = engine_azure.connect()

    #getData from azureDB about vTex Account list
    rows_accountList, columns_accountList = extractionFunction(conn_azure, ss.queryList[file]['sql_query'] )

    #list of main threads
    threads = []
    
    #start the dataList
    globals()["dataList"] = []

    #start the errorList
    globals()["errorList"] = []

    for index in range(len(rows_accountList)):
        t = threadMain(index, columns_accountList, rows_accountList, file)
        threads.append(t.getThread())
    
    for t in threads:
        t.join()

    #treat lists <- ToDo: reduce computational time 
    for d in globals()['dataList']:
       for k, v in d.items():
           if isinstance(v, list):
                text = ""
                for i in range(len(v)):
                    text = v[i] + text
                globals()['dataList'][globals()['dataList'].index(d)][k] = text
    #errorList cant have lists in the variables

    df = pd.DataFrame.from_dict(globals()["dataList"])
    df2 = pd.DataFrame.from_dict(globals()["errorList"])
    
    #columnsDateList = ['creationDate','ShippingEstimatedDate','ShippingEstimatedDateMax','ShippingEstimatedDateMin','authorizedDate','lastChange', 'paymentApprovedDate','readyForHandlingDate']
    #for column in columnsDateList:
    #    df[[column]]= df[[column]].apply(pd.to_datetime)

    #fillNan with empty string
    df = df.fillna("")
    df2 = df2.fillna("")
    
    df.to_sql(ss.queryList[file]['resultSuccessTable'], engine_azure, if_exists='replace', index=False)
    if len(globals()["errorList"]) > 0:
        df2.to_sql(ss.queryList[file]['resultFailedTable'], engine_azure, if_exists='append', index=False)

    conn_azure.close()
    

In [8]:
if __name__ == "__main__":
    file = "getAllNewOrdersVtex_30Days"
    main(file)
    print('getAllNewOrdersVtex_30Days: done')

Starting Thread index: 0
Starting Thread index: 1
Starting Thread index: 2
Starting Thread index: 3
Starting Thread index: 4
Starting Thread index: 5
Starting Thread index: 6
Starting Thread index: 7
Starting Thread index: 8
Starting Thread index: 9
Starting Thread index: 10
Starting Thread index: 11
Starting Thread index: 12
Starting Thread index: 13
Starting Thread index: 14
Starting Thread index: 15
Starting Thread index: 16
Starting Thread index: 17
response status_code:  402
response status_code:  402
response status_code: response status_code:  200
 200
response status_code:  200
response status_code:  200
response status_code:  200
response status_code:  200
response status_code:  200
Starting subThread index: 2 pageNumber: 2
Starting subThread index: 2 pageNumber: 3
response status_code:  200
response status_code:  200
Starting subThread index: 1 pageNumber: 2
Starting subThread index: 1 pageNumber: 3
response status_code:  200
Starting subThread index: 6 pageNumber: 2
response