In [None]:
import sys
import os 
import pandas as pd
import numpy as np
from datetime import datetime
import csv
import shutil
import pymysql
import pymongo
from pymongo import MongoClient
import sqlalchemy

### Data Collection

run the following mongodb query to unwind the collection:

In [None]:
# db.CX_commissions_etl.drop()

# db.CXcommissions_201808201535.aggregate([
# {$unwind: {path: "$TOURCODES", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$ORIGINATING", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$FLIGHTS", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$POS", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$FROM_LOCATION", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$TO_LOCATION", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$PCC", preserveNullAndEmptyArrays: true}},
# {$unwind: {path: "$FBC", preserveNullAndEmptyArrays: true}},
# {$project : {_id : 0 }},
# {$out:"CXcommissions_etl"}
# ]);

# db.CXcommissions_etl.find({}).count()
# 

In [None]:
client=MongoClient('local')
db = client.raxdb

In [None]:
docs = [doc for doc in db["CXcommissions_etl"].find({})]
data = pd.DataFrame(docs)

In [None]:
len(data)

In [None]:
# change columns to lower case
data.columns = [col.lower() for col in data.columns]
data

### Data Preprocessing

In [None]:
# set the pos column as the country column, drop the country and route column
data['pos']=data['country']
data.drop('country', inplace= True, axis=1)
# drop the route 
data.drop('route', axis =1, inplace=True)
data

In [None]:
# replace nan/null with empty string
data=data.fillna("")
#data.loc[]

In [None]:
#update unknown date to '9999-12-31' 
date_cols = ['sales_period_dis_date', 'sales_period_eff_date', 'tkt_dis_date', 'trv_dis_date']
for date_col in date_cols:
    update_mask = data[date_col]==""
    data.loc[update_mask, date_col]='9999-12-31'

In [None]:
# ignore
# update float, ignore row if it can't
# float_cols = ['agent_discount', 'corporate_discount']
# for index, row in data.iterrows():
#     for float_col in float_cols:
#         float_num = row[float_col]
#         data.loc[index, float_col] = float(format(float_num, '.6f'))
#         print("Updateing: ", index, float_col)

In [None]:
# ignore the following step:
# get rid of '+' and '-' in columns
# cols = ['originating']
# for index, row in data.iterrows():
#     for col in cols:
#         string = row[col]
#         if string:
#             string = str(string).replace('-', '').replace('+', '')
#             data.loc[index, col] = string
#             print("Updateing: ", index, string)

In [None]:
# update 9999 to None
cols = ['ct_fare_comm', 'rtw_fare_comm']
for col in cols:
    update_mask = data[col]==9999
    data.loc[update_mask, col]= ""

In [None]:
#replace "'BOS'" with "BOS"
data['from_location']=data['from_location'].apply(lambda x: x.replace("'", ""))
data['fbc']=data['fbc'].apply(lambda x: x.replace("'", ""))

In [None]:
# check values of a column
data['from_location'].unique()

In [None]:
# location 'OR' is in the location information, need to clean it
mask = data['from_location']=='OR'
data.drop(data.loc[mask].index, inplace=True)
mask = data['to_location']=='OR'
data.drop(data.loc[mask].index, inplace=True)
data

In [None]:
data.columns

### Data Cleaning

In [None]:
# check unique values of columns
def check_unique_values(cols=None):
    if cols:
        cols=cols
    else:
        cols = data.columns
    for col in cols:
        print("Columns: ", col)      
        print(data[col].unique())
check_unique_values(['agent_discount', 'corporate_acc_id', 'corporate_discount',
       'ct_fare_comm', 'discount_unit', 'fbc', 'flights', 'pcc', 'pos', 'rbd', 'rtw_fare_comm',
       'sales_period_dis_date', 'sales_period_eff_date', 'tkt_designator',
       'tkt_dis_date', 'trv_dis_date'])

In [None]:
def update_values(data, col, old_val, update_val):
    mask = data[col]==old_val
    data.loc[mask, col] = update_val
# delete row if a column contains certain value
def delete_values(data, col, val):
    mask = data[col]==val
    data.drop(data.loc[mask].index, inplace=True)    

In [None]:
# cleaning:
# case1: Columns:  agent_discount
update_values(data, "agent_discount", "10", 10.0)
# case2:
re_val=[ '3%\nALL TICKETS MUST BE ISSUED\r\n  ON/BEFORE 31DEC2016',
 '3% \nALL TICKETS MUST BE ISSUED\r\n  ON/BEFORE 14AUG2016',
 '3%\nALL TICKETS MUST BE ISSUED\r\n  ON/BEFORE 14AUG2016']
for v in re_val:
    update_values(data, "agent_discount", v, 3.0)   
# case 3: 
update_values(data, "agent_discount", 'W W-TYPE 10%I II I' , 10.0)
update_values(data, "agent_discount", 'R/E R/E-TYPE 5%I II I' , 5.0)
# case 4: replace US_OFFLINES TO US
update_values(data, "pos", 'US_OFFLINES' , 'US')

In [None]:
# check after update
check_unique_values(['agent_discount', 'corporate_acc_id', 'corporate_discount',
       'ct_fare_comm', 'discount_unit', 'fbc', 'flights', 'pcc', 'pos', 'rbd', 'rtw_fare_comm',
       'sales_period_dis_date', 'sales_period_eff_date', 'tkt_designator',
       'tkt_dis_date', 'trv_dis_date'])

In [None]:
data.to_csv("CXcommissions_etl_v1.csv")
#data = pd.read_csv("CXcommissions_etl_v1.csv")

### Normalise the table: Create commissions, commissions_loc and commissions_loc_map 

#### split the table to main and locations, create foreign key

In [None]:
# reset index to create foreign key
data.sort_values(by=['from_location', 'to_location', 'pos'], inplace = True)
data.reset_index(drop=True, inplace = True)
data

In [None]:
data.columns

In [None]:
locations = data[['from_location', 'to_location', 'pos']]
comm_cols = ['agent_discount', 'corporate_acc_id', 'corporate_discount',
       'ct_fare_comm', 'discount_unit', 'fbc', 'filename', 'flights',
       'originating', 'pcc', 'pos', 'rbd', 'rtw_fare_comm',
       'sales_period_dis_date', 'sales_period_eff_date', 'tkt_designator',
       'tkt_dis_date', 'tourcodes', 'trv_dis_date', '_id']
commission_data = data[comm_cols]

In [None]:
# create foreign key
commission_data['loc_fk'] = commission_data.index
locations.drop_duplicates(inplace=True)
# create foreign key to link between the two table
fk_lst = []
for i in range(len(locations.index)):
    index = locations.index[i]
    if i < len(locations.index)-1:
        next_index = locations.index[i+1]
    elif i == len(locations.index)-1:
        next_index = len(commission_data)
    index_lst = [i]*(next_index-index)
    print(index, next_index, next_index-index)
    fk_lst = fk_lst + index_lst
len(fk_lst)  

In [None]:
locations.reset_index(drop=True, inplace=True)
locations['loc_fk'] = locations.index
commission_data['loc_fk'] = fk_lst

In [None]:
locations

#### create location mapping

In [None]:
# backup
location_copy = locations.copy()

In [None]:
# recover
locations = location_copy.copy()

In [None]:
# find unique location pos matching
location_map = locations[['from_location', 'pos']].values.tolist()+locations[['to_location', 'pos']].values.tolist()
location_map = pd.DataFrame(location_map)
location_map.drop_duplicates(inplace=True)
location_map.columns = ['location', 'pos']

In [None]:
# check location
location_map['location'].unique()

#### faulty data found in locations, further data cleaning for location

JFKHKG(VIA CX888/889 ONLY) case:

In [None]:
locations.loc[locations['from_location'] == 'JFKHKG(VIA CX888/889 ONLY)']

In [None]:
# update location table 
locations.loc[locations['from_location'] == 'JFKHKG(VIA CX888/889 ONLY)', 'from_location'] = 'JFK'
locations.loc[locations['to_location'] == 'JFKHKG(VIA CX888/889 ONLY)', 'to_location'] = 'HKG'
locations.loc[locations['loc_fk'] ==57]

In [None]:
# update commission table
dup = commission_data.loc[commission_data['loc_fk'] == 57] 
commission_data.loc[commission_data['loc_fk'] == 57, 'flights'] = '+CX888'
dup.loc[:,'flights'] = '+CX889'
commission_data =  pd.concat([commission_data, dup], ignore_index = True)

other cases:

In [None]:
location_to_clean = {
    'CX AND KA': 'ALL CX AND KA',
    'CX AND KA SYSTEM': 'ALL CX AND KA',
    'SYSTEM': 'ALL CX AND KA',
    'FROM': None   
}

location_to_unwind = {
    'MEXICO ANDLATIN AMERICA': ['MEXICO', 'LATIN AMERICA'],
    'MEXICO AND LATIN AMERICA': ['MEXICO', 'LATIN AMERICA'], 
    'SHA OR HKG': ['SHA','HKG'],
    'WUH, CAN, BJS, SHA OR HKG': ['WUH', 'CAN', 'BJS', 'SHA', 'HKG']    
}
     
location_route = {
    'BOS SIN AND BOS SHA': ('BOS', ['SIN', 'SHA']),
    'BOS TO PVG': ('BOS', 'PVG'),
    'CANADA TO': ('CANADA', None),
    'CANADA TO ASIA': ('CANADA', 'ASIA'),
    'CHI OR CID TO ASIA': (['CHI','CID'], 'ASIA'),
    'LAX TO ASIA': ('LAX', 'ASIA'),
    'LAX TO ASIA ONLY': ('LAX', 'ASIA'),
    'SHA OR HKG': (['SHA', 'HKG'],['SHA', 'HKG']),
    'TOWUH, CAN, BJS,': (None, ['WUH', 'CAN', 'BJS']),
    'USA ORLATIN AMERICA - ASIA': ('USA', 'ASIA'),
    'CANADA – ASIA*': ('CANADA', 'ASIA'),
    'JFK/EWR-AISA': (['EWR', 'JFK'], 'ASIA'),
    'USA-AISA': ('USA', 'ASIA'),
    'USA TO ASIA': ('USA', 'ASIA'),
    'USA ASIA': ('USA', 'ASIA') 
}
# investigate this case: 'JFKHKG(VIA CX888/889 ONLY)'
target_cols = ['from_location', 'to_location']

In [None]:
# clean step 1: change route to from and to location
def elem_convert(elem):
    if isinstance(elem, list):
        elem_return = ' '.join(elem)
        location_to_unwind[elem_return] = elem
    else:
        elem_return = elem
    return elem_return

# transform data based on location_route into location_to_clean and location_to_unwind
clean_data1 = locations.copy()
for tc in target_cols:
    for k, v in location_route.items():
        if len(clean_data1.loc[locations[tc]==k])>0:
            clean_data1.loc[locations[tc]==k, target_cols[0]] = elem_convert(v[0])
            clean_data1.loc[locations[tc]==k, target_cols[1]] = elem_convert(v[1])

In [None]:
# cleaning step 2: replace the faulty value of a location field
def replace(k, v, tcols, locations, location_to_clean):
    new_locations = locations.copy()
    for tc in tcols:
        for k, v in location_to_clean.items():
            new_locations.loc[new_locations[tc] == k, tc] = v
    return new_locations
clean_loc2 = replace(k, v, target_cols, clean_data1, location_to_clean)
clean_loc2

In [None]:
# clean step 3 : unwind the field that has list
def replace_and_unwind(k, v, tc, locations_old, target_cols):
    locations_new = locations_old.copy()
    # if the faulty value exists
    if len(locations_new.loc[locations_old[tc] == k]) > 0:
        # print("updating: ", tc, k, v, len(locations_new))
        # replace the value with the first element of the list
        locations_new.loc[locations_old[tc] == k, tc] = v[0]
        # for the rest of the elements
        # create duplicate rows
        dups = []
        for v_e in v[1:]:
            # create duplicate rows
            dup = locations_new.loc[locations_old[tc] == k]
            # replace target column value in duplicate rows
            dup[tc] = v_e
            dups.append(dup)
        # add in the duplicate rows  
        for dup in dups:
            locations_new = pd.concat([locations_new, dup], ignore_index = True)
        # print("after update: ", tc, k, v, "length of datafrane: ", len(locations_new))      
    # print("after update all: ", k, v, "length of datafrane: ", len(locations_new))
    return locations_new
# cleaning

clean_loc3 = clean_loc2.copy()
for tc in target_cols:
    for k, v in location_to_unwind.items():
        # print("---before update----", len(location_clean))
        clean_loc3= replace_and_unwind(k, v, tc, clean_loc3, target_cols)       
        # print("-------------after update--------------", len(location_clean))
        # print("----------------------------------------")

In [None]:
locations = clean_loc3

In [None]:
# recreate location map
location_map = clean_loc3[['from_location', 'pos']].values.tolist()+clean_loc3[['to_location', 'pos']].values.tolist()
location_map = pd.DataFrame(location_map)
location_map.drop_duplicates(inplace=True)
location_map.columns = ['location', 'pos']

#### Find location code mapping

In [None]:
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://jialuc:test@localhost/genie')
db_conn = engine.connect() 
# get location code information from databse
airport_city_map = pd.read_sql('SELECT airpt_cd, ctry_nm, ctry_cd, city_nm FROM genie.ref_airpt_city;', con=db_conn)
zone_map = pd.read_sql('SELECT zone, area_desc, cntry, cntry_cd FROM genie.ref_atpco_zone;', con=db_conn)
iata_city_map = pd.read_sql('SELECT airpt_cd, cntry_cd, city_cd, atpco_zone FROM genie.iata_airport_city;', con=db_conn)

#### match by airport code

In [None]:
# match airport code with location
location_map = location_map.set_index('location').join(iata_city_map.set_index('airpt_cd'), how='left')
# location_map = location_map.set_index('location').join(airport_city_map.set_index('airpt_cd'), how='left')
# create airport code column, is null when is not
location_map['airpt_cd'] = location_map.index
mask = location_map[['cntry_cd', 'city_cd', 'atpco_zone']].sum(axis=1) == 0
# mask = location_map[['ctry_nm', 'ctry_cd', 'city_nm']].sum(axis=1) == 0
location_map.loc[mask, 'airpt_cd'] = ''
location_map['location'] = location_map.index
location_map

In [None]:
# for the entries are matched by airport code
# if the airport code and the city code are the same, use the city code 
mask1 = location_map['airpt_cd'] != ''
mask2 = location_map['airpt_cd'] == location_map['city_cd']
location_map.loc[mask1&mask2, 'code'] = location_map['city_cd'] 
location_map.loc[mask1&mask2, 'loc_type'] = 'C'

In [None]:
# otherwise use airport
mask3 = location_map['airpt_cd'] != location_map['city_cd']
location_map.loc[mask1&mask3, 'code'] = location_map['airpt_cd']
location_map.loc[mask1&mask3, 'loc_type'] = 'P'

In [None]:
# get rid of the other useless columns
location_map.drop('cntry_cd', inplace=True, axis=1)
location_map.drop('airpt_cd', inplace=True, axis=1)
location_map.drop('atpco_zone', inplace=True, axis=1)
location_map.drop('city_cd', inplace=True, axis=1)

In [None]:
# find unmatch: 
mask = location_map[['code', 'loc_type']].sum(axis=1) == 0
location_map.loc[mask]

#### maunally encode these cases:

In [None]:
location_map.loc['USA', 'code'] = 'US'
location_map.loc['USA', 'loc_type'] = 'N'

location_map.loc['JKT', 'code'] = 'JKT'
location_map.loc['JKT', 'loc_type'] = 'P'

# location_map.loc['BJS', 'code'] = 'BJS'
# location_map.loc['BJS', 'loc_type'] = 'C'

# location_map.loc['CANADA', 'code'] = 'CA'
# location_map.loc['CANADA', 'loc_type'] = 'N'

# location_map.loc['MEXICO', 'code'] = 'MX'
# location_map.loc['MEXICO', 'loc_type'] = 'N'

#### solve asia case

In [None]:
dup = location_map.loc[['ASIA']]
dup = pd.concat([dup]*4, ignore_index=True)
zones = ['310', '320', '330', '340']
for index, row in dup.iterrows():
    dup.loc[index, 'code'] = zones[index]
    dup.loc[index, 'loc_type'] = 'Z'
dup

In [None]:
location_map.loc['ASIA', 'code'] = '220'
location_map.loc['ASIA', 'loc_type'] = 'Z'

In [None]:
location_map = pd.concat([location_map, dup], ignore_index=True)

In [None]:
location_map

#### solve latin america case

In [None]:
dup1 = location_map.loc[['LATIN AMERICA']]
dup1 = pd.concat([dup1]*4, ignore_index=True)
zones = ['120', '140', '160', '170']
for index, row in dup.iterrows():
    dup1.loc[index, 'code'] = zones[index]
    dup1.loc[index, 'loc_type'] = 'Z'
dup1

In [None]:
location_map.loc['LATIN AMERICA', 'code'] = 'MX'
location_map.loc['LATIN AMERICA', 'loc_type'] = 'N'

In [None]:
location_map['location'] = location_map.index

In [None]:
location_map = pd.concat([location_map, dup1], ignore_index=True)

#### check unmatch:

In [None]:
#unmatch: 
mask = location_map[['code', 'loc_type']].sum(axis=1) == 0
location_map.loc[mask]

### previous code, ignore and go to importing data:

In [None]:
# zone_map.drop('cntry_cd', axis=1, inplace=True)
zone_map.columns = ['zone', 'zone_desc', 'zone_country']
# turn country name to upper case
zone_map['zone_country']=zone_map['zone_country'].str.upper()
# join by country name 
location_map = location_map.join(zone_map.set_index('zone_country'), how='left')

In [None]:
mask1 = location_map[['zone', 'zone_desc']].sum(axis=1) != 0

In [None]:
# location that has been matched by zone
location_map.loc[mask1]

In [None]:
# delete some of the wrong matching
location_map = location_map.loc[location_map['zone']!='000']

In [None]:
location_map.columns

In [None]:
# find unmatch:
mask1 = location_map[['cntry_cd', 'city_cd', 'atpco_zone', 'airpt_cd']].sum(axis=1) == 0
mask2 = location_map[['zone', 'zone_desc']].sum(axis=1) == 0
location_map.loc[mask2&mask1]

In [None]:
location_map.loc['ASIA', 'zone'] = '320'
location_map.loc['BJS', 'city_cd'] = 'BJS'
location_map.loc['BJS', 'cntry_cd'] = 'CN'
location_map.loc['BJS', 'atpco_zone'] = '320'
location_map.loc['LATIN AMERICA', 'atpco_zone'] = '320'

In [None]:
location_map.loc[mask2&mask1]

In [None]:
# delete empty entry of lcoations
location_map = location_map[location_map.index!='']
location_map = location_map[location_map.index.notna()]
location_map['location'] = location_map.index
location_map

#### solve case: JFKHKG(VIA CX888/889 ONLY)

In [None]:
locations.loc[locations['from_location'] == 'JFKHKG(VIA CX888/889 ONLY)']

In [None]:
locations.loc[locations['from_location'] == 'JFKHKG(VIA CX888/889 ONLY)', ['from_location', 'to_location']] = ['JFK', 'HKG']

In [None]:
locations.iloc[58]

In [None]:
'JFK' in location_map.index and 'HKG' in location_map.index

In [None]:
commission_data.loc[commission_data['loc_fk'] == 58, ['flights']]

#### the commission only applys via (VIA CX888/889 ONLY)? 

In [None]:
location_map_bu = location_map.copy()

In [None]:
# merge zone with atpco_zone
mask1 = location_map['zone'].isna()
location_map.loc[mask1, 'zone'] = ''
mask2 = location_map['atpco_zone'].isna()
location_map.loc[mask2, 'atpco_zone'] = ''
location_map['new_zone'] = location_map['zone'] + location_map['atpco_zone']
location_map

In [None]:
location_map['zone'] = location_map['new_zone']
location_map.drop('atpco_zone', inplace=True, axis=1)
location_map.drop('new_zone', inplace=True, axis=1)
location_map.drop('zone_desc', inplace=True, axis=1)

#### unsolve case: ALL CX, JFKHKG(VIA CX888/889 ONLY), LATIN AMERICA

In [None]:
location_map.to_csv('commissions_loc_map.csv')
commission_data.to_csv('commissions.csv')
locations.to_csv('commissions_loc.csv')

#### transform the location map 

In [None]:
match_by_city = location_map['location'] = 

### Importing the data

#### commissions

In [None]:
# code for reading csv file directly 
# path of the csv file that stores the source files directory

# dir_path = "CXcommissions_201807051523.csv"
# dat=pd.read_csv(dir_path)
# dat.drop(['Unnamed: 0'], axis=1, inplace = True)
# dat.columns = [col.lower() for col in dat.columns]
# dat['commission_id'] = np.nan
# dat_cols = list(dat.columns)
# dat_cols.insert(0, dat_cols.pop())
# dat = dat[dat_cols]
# dat=pd.read_csv(dir_path)
# dat.drop(['Unnamed: 0'], axis=1, inplace = True)
# dat.columns = [col.lower() for col in dat.columns]

In [None]:
# check the import data
#commission_data
#location_map
#locations

In [None]:
commission_data.columns

In [None]:
commission_data['tkt_designator'].unique()

#### transform data types to match with sqlalchemy: 

In [None]:
# specify columns in each data type for commission_data
int_type = []
float_type = ['agent_discount', 'corporate_discount']
date_type = ['sales_period_dis_date', 'sales_period_eff_date', 'tkt_dis_date']
str_type = ['loc_fk', 'pcc', 'ct_fare_comm', 'rtw_fare_comm', 'country', 'discount_unit', 'fbc', 'filename', 'flights', 
            'originating', 'tkt_designator','corporate_acc_id', 'pos', 'rbd', 'tourcodes', '_id']

In [None]:
len(str_type)+len(date_type)+len(float_type) == len(commission_data.columns)

In [None]:
from datetime import datetime
# data type mapper+transformation function
def type_mapper(col, index, row):
    no_error = True
    dat = row[col]
    try:
        if col in int_type:
            row[col] = float(dat)       
        if col in float_type:
            row[col] = float(dat)
        if col in str_type:
            row[col] = str(dat)
        if col in date_type:
            date = datetime.strptime(dat,'%Y-%m-%d')
            row[col] = date.date()
    except TypeError:
        no_error = False
        print(col, row[col])
    except ValueError: 
        no_error = False
        print(col, row[col])
    return no_error, row

In [None]:
from datetime import datetime, timedelta
# convert data for importing
commissions = []
for index, row in commission_data.iterrows():
    no_error = True
    row = dict(row)
    not_nan_cols = []
    # if the field is nan
    for col in commission_data.columns:
        if str(row[col]) == 'nan' or row[col] == '' or row[col] == 'None':
            row[col] = ""
        # if is not nan
        else:
            not_nan_cols.append(col)
    # for all columns that are not nan field
    for col in not_nan_cols:
        this_no_error, row = type_mapper(col, index, row)
        no_error = no_error and this_no_error
    if no_error:
        commissions.append(row)
    else:
        print("Skip row due to conversion error: ", index)

In [None]:
# add run id
import datetime
time= datetime.now()

for com in commissions:
    com['run_id'] = time.strftime('%Y%m%H%M')

In [None]:
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://jialuc:12345678@test@localhost/genie/cx_dw')

In [None]:
from sqlalchemy.ext.declarative import declarative_base
from  sqlalchemy.schema import Column
from sqlalchemy import *

# create table in database
Base = declarative_base()
class Commissions(Base):
    __tablename__="commissions"
    agent_discount = Column(Numeric(6))
    corporate_discount = Column(Numeric(6))
    # country = Column(String(4))
    ct_fare_comm = Column(String(10))
    discount_unit = Column(String(4)) 
    fbc = Column(String(7))
    filename = Column(String(200))
    flights = Column(String(100),nullable=True, primary_key=True) 
    # from_location = Column(String(100))
    originating = Column(String(50)) 
    pos = Column(String(50)) 
    rbd = Column(String(4)) 
    rtw_fare_comm = Column(String(10))
    sales_period_dis_date = Column(Date) 
    sales_period_eff_date = Column(Date) 
    tkt_dis_date = Column(Date)
    tourcodes = Column(String(100)) 
    # to_location = Column(String(100)) 
    trv_dis_date = Column(Date)
    pcc = Column(String(10),nullable=True) 
    tkt_designator = Column(String(50),nullable=True)  
    corporate_acc_id = Column(String(20),nullable=True)
    _id = Column(String(40), primary_key=True)
    run_id = Column(String(12), primary_key=True)
    loc_fk = Column(Integer,)
# create commissions table if not exists
Commissions.__table__.create(bind=engine, checkfirst=True)

In [None]:
from sqlalchemy.orm.session import sessionmaker
Session = sessionmaker(bind=engine)
session = Session() 
for com in commissions:
    row = Commissions(**com)
    session.add(row)
session.commit()

### Import locations into MySQL

In [None]:
locations.head()

In [None]:
locations.columns

In [None]:
from sqlalchemy.ext.declarative import declarative_base
from  sqlalchemy.schema import Column
from sqlalchemy import *

# create table in database
Base = declarative_base()
class Locations(Base):
    __tablename__="commissions_loc"
    from_location = Column(String(100), nullable=True, primary_key=True)
    pos = Column(String(50), nullable=True, primary_key=True) 
    to_location = Column(String(100), nullable=True, primary_key=True) 
    loc_fk = Column(Integer, primary_key=True)
# create commissions table if not exists
Locations.__table__.create(bind=engine, checkfirst=True)

In [None]:
int_type = ['loc_fk']
str_type = ['from_location', 'to_location', 'pos']
date_type = []
float_type = []

In [None]:
# transform the type
from datetime import datetime, timedelta
commissions_loc = []
for index, row in locations.iterrows():
    no_error = True
    row = dict(row)
    not_nan_cols = []
    # if the field is nan
    for col in locations.columns:
        if str(row[col]) == 'nan' or row[col] == '' or row[col] == 'None':
            row[col] = None
        # if is not nan
        else:
            not_nan_cols.append(col)
    # for all columns that are not nan field
    for col in not_nan_cols:
        this_no_error, row = type_mapper(col, index, row)
        no_error = no_error and this_no_error
    if no_error:
        commissions_loc.append(row)
    else:
        print("Skip row due to conversion error: ", index, row)

In [None]:
len(commissions_loc)

In [None]:
# import the data
from sqlalchemy.orm.session import sessionmaker
Session = sessionmaker(bind=engine)
session = Session() 
for com_loc in commissions_loc:
    row = Locations(**com_loc)
    session.add(row)
session.commit()

### Import location_map into MySQL

In [None]:
# specify columns in each data type for commission_data
int_type = []
float_type = []
str_type = ['pos', 'loc_type', 'code', 'location']

In [None]:
Base = declarative_base()
class Maps(Base):
    __tablename__="commissions_loc_map"
    pos = Column(String(10), nullable=True, primary_key=True)
    location = Column(String(50), nullable=True, primary_key=True)
    code = Column(String(3), nullable=True, primary_key=True)
    loc_type = Column(String(1), nullable=True, primary_key=True)
# create commissions table if not exists
Maps.__table__.create(bind=engine, checkfirst=True)

In [None]:
from datetime import datetime, timedelta
commissions_map = []
for index, row in location_map.iterrows():
    no_error = True
    row = dict(row)
    not_nan_cols = []
    # if the field is nan
    for col in location_map.columns:
        if str(row[col]) == 'nan' or row[col] == '' or row[col] == 'None' or row[col] == None:
            # print(index, col)
            row[col] = None
            # print(row[col])
        # if is not nan
        else:
            not_nan_cols.append(col)
    # for all columns that are not nan field
    for col in not_nan_cols:
        this_no_error, row = type_mapper(col, index, row)
        no_error = no_error and this_no_error
    if no_error:
        commissions_map.append(row)
    else:
        print("Skip row due to conversion error: ", index, row)

In [None]:
from sqlalchemy.orm.session import sessionmaker
Session = sessionmaker(bind=engine)
session = Session() 
for com_loc in commissi
ons_map:
    row = Maps(**com_loc)
    session.add(row)
session.commit()

### Data import with csv file

In [None]:
file_path = "CXcommissions_etl.csv"
schema = "zz_jc"
table_name = "commissions"

In [None]:
load_sql = """LOAD DATA LOCAL INFILE 'CXcommissions_etl.csv' INTO TABLE zz_jc.commissions_etl FIELDS TERMINATED BY ',' ENCLOSED BY '"' IGNORE 1 LINES;"""

In [None]:
def connect_store_commissions(hostname, username, password, database):

    '''
    This function load a csv file to MySQL table according to
    the load_sql statement.
    '''
    try:
        con = pymysql.connect(host=hostname,
                                user=username,
                                password=password,
                                local_infile=1)
        
        print('Connected to DB: {}'.format(hostname))
        # Create cursor and execute Load SQL
        cursor = con.cursor()
        truncate_Stmt = "truncate zz_jc.commissions_etl"
        cursor.execute(truncate_Stmt)
        cursor.execute(load_sql)
        print('Succuessfully loaded the table from csv.')
        con.close()
        
    except Exception as e:
        print('Error: {}'.format(str(e)))
        sys.exit(1)

connect_store_commissions(hostname, username, password, database)

