# ETLs

## Imports

In [34]:
import pandas as pd
import sqlite3
import pyodbc

select_tables = "SELECT name FROM sqlite_master WHERE type='table'"

sales_con = sqlite3.connect("go_sales.sqlite")
sales_tables = pd.read_sql_query(select_tables, sales_con)

sales_country       = pd.read_sql_query("SELECT * FROM country;", sales_con)
order_details       = pd.read_sql_query("SELECT * FROM order_details;", sales_con)
order_header        = pd.read_sql_query("SELECT * FROM order_header;", sales_con)
order_method        = pd.read_sql_query("SELECT * FROM order_method;", sales_con)
product             = pd.read_sql_query("SELECT * FROM product;", sales_con)
product_line        = pd.read_sql_query("SELECT * FROM product_line;", sales_con)
product_type        = pd.read_sql_query("SELECT * FROM product_type;", sales_con)
sales_retailer_site = pd.read_sql_query("SELECT * FROM retailer_site;", sales_con)
return_reason       = pd.read_sql_query("SELECT * FROM return_reason;", sales_con)
returned_item       = pd.read_sql_query("SELECT * FROM returned_item;", sales_con)
sales_branch        = pd.read_sql_query("SELECT * FROM sales_branch;", sales_con)
sales_staff         = pd.read_sql_query("SELECT * FROM sales_staff;", sales_con)
SALES_TARGETData    = pd.read_sql_query("SELECT * FROM SALES_TARGETData;", sales_con)
sqlite_sequence     = pd.read_sql_query("SELECT * FROM sqlite_sequence;", sales_con)
print("Imported sales tables")

staff_con = sqlite3.connect("go_staff.sqlite")
staff_tables = pd.read_sql_query(select_tables, staff_con)

course            = pd.read_sql_query("SELECT * FROM course;", staff_con)
sales_branch      = pd.read_sql_query("SELECT * FROM sales_branch;", staff_con)
sales_staff       = pd.read_sql_query("SELECT * FROM sales_staff;", staff_con)
satisfaction      = pd.read_sql_query("SELECT * FROM satisfaction;", staff_con)
satisfaction_type = pd.read_sql_query("SELECT * FROM satisfaction_type;", staff_con)
training          = pd.read_sql_query("SELECT * FROM training;", staff_con)
print("Imported staff tables")

crm_con = sqlite3.connect("go_crm.sqlite")
crm_tables = pd.read_sql_query(select_tables, crm_con)
                           
age_group             = pd.read_sql_query("SELECT * FROM age_group;", crm_con)
crm_country           = pd.read_sql_query("SELECT * FROM country;", crm_con)
retailer              = pd.read_sql_query("SELECT * FROM retailer;", crm_con)
retailer_contact      = pd.read_sql_query("SELECT * FROM retailer_contact;", crm_con)
retailer_headquarters = pd.read_sql_query("SELECT * FROM retailer_headquarters;", crm_con)
retailer_segment      = pd.read_sql_query("SELECT * FROM retailer_segment;", crm_con)
crm_retailer_site     = pd.read_sql_query("SELECT * FROM retailer_site;", crm_con)
retailer_type         = pd.read_sql_query("SELECT * FROM retailer_type;", crm_con)
sales_demographic     = pd.read_sql_query("SELECT * FROM sales_demographic;", crm_con)
sales_territory       = pd.read_sql_query("SELECT * FROM sales_territory;", crm_con)
print("Imported crm tables")

inventory_level = pd.read_csv("GO_SALES_INVENTORY_LEVELSData.csv")
print("Imported inventory table")

sales_forecast = pd.read_csv("GO_SALES_PRODUCT_FORECASTData.csv")
print("Imported sales product forecast table")

Imported sales tables
Imported staff tables
Imported crm tables
Imported inventory table
Imported sales product forecast table


## SQL Server connection

In [35]:
servername = 'DESKTOP-9F8A8PF\\MSSQLSERVER01'
database = 'Datawarehouse'

sql_server_conn = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={servername};DATABASE={database};Trusted_Connection=yes")
cursor = sql_server_conn.cursor()

try:
    cursor.execute("SELECT * FROM Test")
    test = cursor.fetchall()
    print(test)
except pyodbc.Error as e:
    print(e)

[(1, 'test      '), (2, 'test2     ')]


In [36]:
"""
Flexible method to merge two tables
- NaN values of one dataframe can be filled by the other dataframe
- Uses all available columns
- Errors when a row of the two dataframes doesn't match (df1 has 'A' and df2 has 'B' in row)
"""
def merge_tables(df1, df2, index_col):
    # Ensure 'CODE' is set as the index for both DataFrames
    if index_col not in df1.columns or index_col not in df2.columns:
        raise KeyError(f"{index_col} must be a column in both DataFrames.")
    
    df1 = df1.set_index(index_col)
    df2 = df2.set_index(index_col)

    # Identify common and exclusive columns
    common_columns = df1.columns.intersection(df2.columns)
    exclusive_df1 = df1.columns.difference(df2.columns)
    exclusive_df2 = df2.columns.difference(df1.columns)

    # Concatenate exclusive columns from each DataFrame onto the other
    df1_combined = pd.concat([df1, df2[exclusive_df2]], axis=1, sort=False)
    df2_combined = pd.concat([df2, df1[exclusive_df1]], axis=1, sort=False)

    # Resolve common columns with nulls and conflicts
    for col in common_columns:
        # Align the Series from both DataFrames for comparison
        series1, series2 = df1_combined[col].align(df2_combined[col])

        # Check for conflicts (non-null values that do not match)
        conflict_mask = (~series1.isnull() & ~series2.isnull() & (series1 != series2))
        if conflict_mask.any():
            raise ValueError(f"Merge failed due to conflict in column '{col}'")

        # Use values from df2 where df1 is null (prioritizing df1 values)
        df1_combined[col] = series1.combine_first(series2)

    return df1_combined

# Merge duplicate tables into single table
retailer_site = merge_tables(sales_retailer_site, crm_retailer_site, 'RETAILER_SITE_CODE')
# Column name mismatch
sales_country = sales_country.rename(columns={'COUNTRY': 'COUNTRY_EN'})
country = merge_tables(sales_country, crm_country, 'COUNTRY_CODE')

## Utilities

In [37]:
rename_mapping = {
    'ACTIVE_INDICATOR': 'ACTIVE_INDICATOR_bool',
    'ADDRESS1': 'ADDRESS1_address',
    'ADDRESS2': 'ADDRESS2_address',
    'CITY': 'CITY_name',
    'COMPANY_NAME': 'COMPANY_name',
    'COUNTRY_CODE': 'COUNTRY_id',
    'COUNTRY_EN': 'COUNTRY_name',
    'COURSE_CODE': 'COURSE_id',
    'COURSE_DESCRIPTION': 'COURSE_description',
    'COUNTRY_LANGUAGE_code': 'COUNTRY_LANGUAGE_id',
    'CURRENCY_NAME': 'CURRENCY_name',
    'DATE_HIRED': 'DATE_HIRED_date',
    'DESCRIPTION': 'PRODUCT_description',
    'EMAIL': 'EMAIL_address',
    'EXPECTED_VOLUME': 'EXPECTED_VOLUME_number',
    'EXTENSION': 'EXTENSION_number',
    'E_MAIL': 'EMAIL_address',
    'FAX': 'FAX_phone',
    'FIRST_NAME': 'FIRST_NAME_name',
    'FLAG_IMAGE': 'FLAG_image',
    'GENDER': 'GENDER_char',
    'INTRODUCTION_DATE': 'PRODUCT_INTRODUCTION_DATE_date',
    'JOB_POSITION_EN': 'JOB_POSITION_name',
    'LANGUAGE': 'LANGUAGE_name',
    'LAST_NAME': 'LAST_NAME_name',
    'MANAGER_CODE': 'MANAGER_id',
    'MARGIN': 'PRODUCT_MARGIN_percentage',
    'MONTH': 'MONTH_number',
    'ORDER_DATE': 'ORDER_DATE_date',
    'ORDER_DETAIL_CODE': 'ORDER_DETAIL_id',
    'ORDER_METHOD_CODE': 'ORDER_METHOD_id',
    'ORDER_METHOD_EN': 'ORDER_METHOD_name',
    'ORDER_NUMBER': 'ORDER_TABLE_id',
    'PHONE': 'PHONE_phone',
    'POSITION_EN': 'POSITION_name',
    'POSTAL_ZONE': 'POSTAL_ZONE_code',
    'PRODUCTION_COST': 'PRODUCT_PRODUCTION_COST_money',
    'PRODUCT_IMAGE': 'PRODUCT_image',
    'PRODUCT_LINE_CODE': 'PRODUCT_LINE_id',
    'PRODUCT_LINE_EN': 'PRODUCT_LINE_name',
    'PRODUCT_NAME': 'PRODUCT_name',
    'PRODUCT_NUMBER': 'PRODUCT_id',
    'QUANTITY': 'QUANTITY_number',
    'REGION': 'REGION_name',
    'RETAILER_CODE': 'RETAILER_id',
    'RETAILER_CODEMR': 'RETAILER_MR_id',
    'RETAILER_CONTACT_CODE': 'RETAILER_CONTACT_id',
    'RETAILER_NAME': 'RETAILER_name',
    'RETAILER_SITE_CODE': 'RETAILER_SITE_id',
    'RETAILER_TYPE_CODE': 'RETAILER_TYPE_id',
    'RETAILER_TYPE_EN': 'RETAILER_TYPE_name',
    'RETURN_CODE': 'RETURNS_id',
    'RETURN_DATE': 'RETURN_DATE_date',
    'RETURN_DESCRIPTION_EN': 'RETURN_REASON_description',
    'RETURN_QUANTITY': 'RETURN_QUANTITY_number',
    'RETURN_REASON_CODE': 'RETURN_REASON_id',
    'SALES_BRANCH_CODE': 'SALES_BRANCH_id',
    'SALES_STAFF_CODE': 'SALES_STAFF_id',
    'SALES_TERRITORY_CODE': 'SALES_TERRITORY_id',
    'SATISFACTION_TYPE_CODE': 'SATISFACTION_TYPE_id',
    'SATISFACTION_TYPE_DESCRIPTION': 'SATISFACTION_TYPE_description',
    'SEGMENT_CODE': 'SEGMENT_code',
    'SEGMENT_LANGUAGE_code': 'SEGMENT_LANGUAGE_id',
    'TERRITORY_NAME_EN': 'TERRITORY_name',
    'UNIT_COST': 'UNIT_COST_money',
    'UNIT_PRICE': 'UNIT_PRICE_money',
    'UNIT_SALE_PRICE': 'UNIT_SALE_PRICE_money',
    'WORK_PHONE': 'WORK_PHONE_phone',
    'YEAR': 'YEAR_number'
}

# List of all vetted columns
valid_columns = list(rename_mapping.values())

# Filters out all columns of dataframe that aren't typed
def filterColumns(dataframe):
    valid_columns_set = set(valid_columns)
    actual_columns_set = set(dataframe.columns)
    intersection_columns = list(actual_columns_set.intersection(valid_columns_set))

    # Use the intersection result to filter columns from dataframe
    return dataframe[intersection_columns]

# Filters out all columns of dataframe that aren't typed
def excludeColumns(dataframe, column_names):
    return dataframe[dataframe.columns.difference(column_names)]

def sizeCheck(dataframe, expected_column_count):
    actual_column_count = len(dataframe.columns)
    if actual_column_count == expected_column_count:
        print(f'Table has {expected_column_count} columns')
    else:
        raise Exception(f'Table has {actual_column_count} columns, expected {expected_column_count}')


column_types = {
    'name': 'NVARCHAR(80)',
    'image': 'NVARCHAR(60)',
    'id': 'INT',
    'description': 'NTEXT',
    'money': 'DECIMAL(19,4)',
    'percentage': 'DECIMAL(12,12)',
    'date': 'NVARCHAR(30)',
    'code': 'NVARCHAR(40)',
    'char': 'CHAR(1)',
    'number': 'INT',
    'phone': 'NVARCHAR(30)',
    'address': 'NVARCHAR(80)',
    'bool': 'BIT',
}


def getTypes():
    types = {}
    for column in rename_mapping.values():
        column_type = column.rsplit('_', 1)[1]
        types[column_type] = ''
    return types

def columnType(column_name):
    err = ''
    try:
        return column_types[column_name.rsplit('_', 1)[1]]
    except IndexError:
        err = "Column name doesn't contain a type"
    except KeyError:
        err = "Column type not found"
    raise Exception(err)

# def columnValue(column_name):
    
#     if columnType(column_name) in va

def createTable(dataframe, PK):
    # Primary key with the type extension removed
    # Manual labor isn't worth it!
    tablename = PK.rsplit('_', 1)[0]

    # Add Primary Key as first column
    columns = f'{PK} {columnType(PK)} NOT NULL PRIMARY KEY'

    # Add all the other columns
    for column in dataframe.columns:
        if column != PK: # PK is already added
            columns += f', {column} {columnType(column)}'

    # Create the command
    command = f"CREATE TABLE {tablename} ({columns})"

    print(command)

    try:
        cursor.execute(command)
        cursor.commit()
    except pyodbc.Error as e:
        if 'There is already an object named' in str(e):
            print('Table already exists in database')
        else:
            raise(e)

def insertTable(dataframe, PK):
    # Primary key with the type extension removed
    # Manual labor isn't worth it!
    tablename = PK.rsplit('_', 1)[0]

    # Add Primary Key as first column
    columns = PK
    
    # Add all the other columns
    for column in dataframe.columns:
        if column != PK: # PK is already added
            columns += f', {column}'

    
    # Execute inserts
    for i, row in dataframe.iterrows():
        values = ''
        values += str(row[PK])

        for column in dataframe.columns:
            if column != PK: # PK is already added
                try:
                    val = str(row[column]).replace("'","''")
                    values += f", '{val}'"
                except AttributeError:
                    values += f", NULL"

        command = f"INSERT INTO {tablename} ({columns}) VALUES ({values});\n"
        
        try:
            cursor.execute(command)
            cursor.commit()
        except pyodbc.Error as e:
            if 'There is already an object named' in str(e):
                print('Table already exists in database')
            else:
                print(command)
            

column_types

{'name': 'NVARCHAR(80)',
 'image': 'NVARCHAR(60)',
 'id': 'INT',
 'description': 'NTEXT',
 'money': 'DECIMAL(19,4)',
 'percentage': 'DECIMAL(12,12)',
 'date': 'NVARCHAR(30)',
 'code': 'NVARCHAR(40)',
 'char': 'CHAR(1)',
 'number': 'INT',
 'phone': 'NVARCHAR(30)',
 'address': 'NVARCHAR(80)',
 'bool': 'BIT'}

## Product ETL

In [38]:
# Merge
product_etl = pd.merge(product, product_type, on="PRODUCT_TYPE_CODE")
product_etl = pd.merge(product_etl, product_line, on="PRODUCT_LINE_CODE")

# Rename
product_etl = product_etl.rename(columns=rename_mapping)

# Exclude
product_etl = filterColumns(product_etl)

# Assert
sizeCheck(product_etl,10)
product_etl

# Create
createTable(product_etl, 'PRODUCT_id')
insertTable(product_etl, 'PRODUCT_id')



Table has 10 columns
CREATE TABLE PRODUCT (PRODUCT_id INT NOT NULL PRIMARY KEY, LANGUAGE_name NVARCHAR(80), PRODUCT_PRODUCTION_COST_money DECIMAL(19,4), PRODUCT_description NTEXT, PRODUCT_image NVARCHAR(60), PRODUCT_MARGIN_percentage DECIMAL(12,12), PRODUCT_LINE_name NVARCHAR(80), PRODUCT_INTRODUCTION_DATE_date NVARCHAR(30), PRODUCT_name NVARCHAR(80), PRODUCT_LINE_id INT)


## Sales Staff ETL

In [39]:
# Merge
sales_staff_etl = pd.merge(sales_staff, sales_branch, on='SALES_BRANCH_CODE')
sales_staff_etl = pd.merge(sales_staff_etl, country, on='COUNTRY_CODE')
sales_staff_etl = pd.merge(sales_staff_etl, sales_territory, on='SALES_TERRITORY_CODE')

# Rename
sales_staff_etl = sales_staff_etl.rename(columns=rename_mapping)

# Exclude
sales_staff_etl = filterColumns(sales_staff_etl)

# Assert
sizeCheck(sales_staff_etl,23)
sales_staff_etl

# Create
createTable(sales_staff_etl, 'SALES_STAFF_id')
insertTable(sales_staff_etl, 'SALES_STAFF_id')

Table has 23 columns
CREATE TABLE SALES_STAFF (SALES_STAFF_id INT NOT NULL PRIMARY KEY, POSITION_name NVARCHAR(80), FLAG_image NVARCHAR(60), ADDRESS1_address NVARCHAR(80), SALES_BRANCH_id INT, DATE_HIRED_date NVARCHAR(30), SALES_TERRITORY_id INT, EMAIL_address NVARCHAR(80), LAST_NAME_name NVARCHAR(80), MANAGER_id INT, FAX_phone NVARCHAR(30), REGION_name NVARCHAR(80), COUNTRY_id INT, CITY_name NVARCHAR(80), LANGUAGE_name NVARCHAR(80), FIRST_NAME_name NVARCHAR(80), COUNTRY_name NVARCHAR(80), CURRENCY_name NVARCHAR(80), WORK_PHONE_phone NVARCHAR(30), TERRITORY_name NVARCHAR(80), POSTAL_ZONE_code NVARCHAR(40), EXTENSION_number INT, ADDRESS2_address NVARCHAR(80))
INSERT INTO SALES_STAFF (SALES_STAFF_id, POSITION_name, FLAG_image, ADDRESS1_address, SALES_BRANCH_id, DATE_HIRED_date, SALES_TERRITORY_id, EMAIL_address, LAST_NAME_name, MANAGER_id, FAX_phone, REGION_name, COUNTRY_id, CITY_name, LANGUAGE_name, FIRST_NAME_name, COUNTRY_name, CURRENCY_name, WORK_PHONE_phone, TERRITORY_name, POSTAL_Z

## Satisfaction type ETL

In [40]:
# Rename
satisfaction_type_etl = satisfaction_type.rename(columns=rename_mapping)

# Exclude
satisfaction_type_etl = filterColumns(satisfaction_type_etl)

# Assert
sizeCheck(satisfaction_type_etl,2)
satisfaction_type_etl

# Create
createTable(satisfaction_type_etl, 'SATISFACTION_TYPE_id')
insertTable(satisfaction_type_etl, 'SATISFACTION_TYPE_id')

Table has 2 columns
CREATE TABLE SATISFACTION_TYPE (SATISFACTION_TYPE_id INT NOT NULL PRIMARY KEY, SATISFACTION_TYPE_description NTEXT)


## Course ETL

In [41]:
# Rename
course_etl = course.rename(columns=rename_mapping)

# Exclude
course_etl = filterColumns(course_etl)

# Assert
sizeCheck(course_etl,2)
course_etl

# Create
createTable(course_etl, 'COURSE_id')
insertTable(course_etl, 'COURSE_id')

Table has 2 columns
CREATE TABLE COURSE (COURSE_id INT NOT NULL PRIMARY KEY, COURSE_description NTEXT)


## Sales Forecast ETL

In [42]:
# Rename
sales_forecast_etl = sales_forecast.rename(columns=rename_mapping)

# Exclude
sales_forecast_etl = filterColumns(sales_forecast_etl)

# Assert
sizeCheck(sales_forecast_etl,4)
sales_forecast_etl

# Create
createTable(sales_forecast_etl, 'PRODUCT_id')
insertTable(sales_forecast_etl, 'PRODUCT_id')

Table has 4 columns
CREATE TABLE PRODUCT (PRODUCT_id INT NOT NULL PRIMARY KEY, MONTH_number INT, YEAR_number INT, EXPECTED_VOLUME_number INT)
Table already exists in database
INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (44, '12', '2022', '383');

INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (45, '1', '2021', '80');

INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (45, '2', '2021', '51');

INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (45, '3', '2021', '214');

INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (45, '4', '2021', '300');

INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (45, '5', '2021', '141');

INSERT INTO PRODUCT (PRODUCT_id, MONTH_number, YEAR_number, EXPECTED_VOLUME_number) VALUES (45, '6', '2021', '210');



## Retailer Contact ETL

In [43]:
# Merge
retailer_contact_etl = pd.merge(retailer_contact, retailer_site, on='RETAILER_SITE_CODE')
retailer_contact_etl = pd.merge(retailer_contact_etl, country, on='COUNTRY_CODE')
retailer_contact_etl = pd.merge(retailer_contact_etl, sales_territory, on='SALES_TERRITORY_CODE')\
    
# Rename 
retailer_contact_etl = retailer_contact_etl.rename(columns=rename_mapping)

# Exclude
retailer_contact_etl = filterColumns(retailer_contact_etl)

# Assert
sizeCheck(retailer_contact_etl,23)
retailer_contact_etl

# Create
createTable(retailer_contact_etl, 'RETAILER_CONTACT_id')
insertTable(retailer_contact_etl, 'RETAILER_CONTACT_id')

Table has 23 columns
CREATE TABLE RETAILER_CONTACT (RETAILER_CONTACT_id INT NOT NULL PRIMARY KEY, RETAILER_id INT, JOB_POSITION_name NVARCHAR(80), FLAG_image NVARCHAR(60), ADDRESS1_address NVARCHAR(80), SALES_TERRITORY_id INT, EMAIL_address NVARCHAR(80), LAST_NAME_name NVARCHAR(80), GENDER_char CHAR(1), FAX_phone NVARCHAR(30), REGION_name NVARCHAR(80), COUNTRY_id INT, CITY_name NVARCHAR(80), LANGUAGE_name NVARCHAR(80), FIRST_NAME_name NVARCHAR(80), COUNTRY_name NVARCHAR(80), CURRENCY_name NVARCHAR(80), TERRITORY_name NVARCHAR(80), POSTAL_ZONE_code NVARCHAR(40), ACTIVE_INDICATOR_bool BIT, EXTENSION_number INT, ADDRESS2_address NVARCHAR(80), RETAILER_SITE_id INT)
INSERT INTO RETAILER_CONTACT (RETAILER_CONTACT_id, RETAILER_id, JOB_POSITION_name, FLAG_image, ADDRESS1_address, SALES_TERRITORY_id, EMAIL_address, LAST_NAME_name, GENDER_char, FAX_phone, REGION_name, COUNTRY_id, CITY_name, LANGUAGE_name, FIRST_NAME_name, COUNTRY_name, CURRENCY_name, TERRITORY_name, POSTAL_ZONE_code, ACTIVE_INDI

INSERT INTO RETAILER_CONTACT (RETAILER_CONTACT_id, RETAILER_id, JOB_POSITION_name, FLAG_image, ADDRESS1_address, SALES_TERRITORY_id, EMAIL_address, LAST_NAME_name, GENDER_char, FAX_phone, REGION_name, COUNTRY_id, CITY_name, LANGUAGE_name, FIRST_NAME_name, COUNTRY_name, CURRENCY_name, TERRITORY_name, POSTAL_ZONE_code, ACTIVE_INDICATOR_bool, EXTENSION_number, ADDRESS2_address, RETAILER_SITE_id) VALUES (115, '109', 'District Manager', 'F03', '1700 George Street', '1', 'AngelaP@zxyznet.net', 'Pathak', 'F', '1 (415) 642-8746', 'California', '3', 'San Francisco', 'EN', 'Angela', 'United States', 'dollars', 'Americas', '94127', '1', 'None', 'Suite 1B', '103');

INSERT INTO RETAILER_CONTACT (RETAILER_CONTACT_id, RETAILER_id, JOB_POSITION_name, FLAG_image, ADDRESS1_address, SALES_TERRITORY_id, EMAIL_address, LAST_NAME_name, GENDER_char, FAX_phone, REGION_name, COUNTRY_id, CITY_name, LANGUAGE_name, FIRST_NAME_name, COUNTRY_name, CURRENCY_name, TERRITORY_name, POSTAL_ZONE_code, ACTIVE_INDICATOR_b

## Retailer ETL

In [44]:
# Merge
retailer_etl = pd.merge(retailer, retailer_headquarters, on='RETAILER_CODEMR')
retailer_etl = pd.merge(retailer_etl, retailer_type, on='RETAILER_TYPE_CODE')

# Merge and rename language columns for clarity
retailer_etl = pd.merge(retailer_etl, retailer_segment, on='SEGMENT_CODE').rename(columns={'LANGUAGE':'SEGMENT_LANGUAGE_code'})
retailer_etl = pd.merge(retailer_etl, country, on='COUNTRY_CODE').rename(columns={'LANGUAGE':'COUNTRY_LANGUAGE_code'})

# Exclude columns early due to merge naming conflicts
retailer_etl = excludeColumns(retailer_etl, ['TRIAL219','TRIAL222_x','TRIAL222_y','TRIAL222'])

# Rename
retailer_etl = pd.merge(retailer_etl, sales_territory, on='SALES_TERRITORY_CODE')\
    .rename(columns=rename_mapping)

# Exclude
retailer_etl = filterColumns(retailer_etl)

# Assert
sizeCheck(retailer_etl,22)
retailer_etl

# Create
createTable(retailer_etl, 'RETAILER_id')
insertTable(retailer_etl, 'RETAILER_id')

Table has 22 columns
CREATE TABLE RETAILER (RETAILER_id INT NOT NULL PRIMARY KEY, PHONE_phone NVARCHAR(30), FLAG_image NVARCHAR(60), ADDRESS1_address NVARCHAR(80), RETAILER_MR_id INT, SALES_TERRITORY_id INT, RETAILER_TYPE_name NVARCHAR(80), SEGMENT_code NVARCHAR(40), FAX_phone NVARCHAR(30), REGION_name NVARCHAR(80), COUNTRY_LANGUAGE_id INT, COUNTRY_id INT, SEGMENT_LANGUAGE_id INT, CITY_name NVARCHAR(80), COUNTRY_name NVARCHAR(80), CURRENCY_name NVARCHAR(80), TERRITORY_name NVARCHAR(80), COMPANY_name NVARCHAR(80), RETAILER_name NVARCHAR(80), POSTAL_ZONE_code NVARCHAR(40), RETAILER_TYPE_id INT, ADDRESS2_address NVARCHAR(80))


INSERT INTO RETAILER (RETAILER_id, PHONE_phone, FLAG_image, ADDRESS1_address, RETAILER_MR_id, SALES_TERRITORY_id, RETAILER_TYPE_name, SEGMENT_code, FAX_phone, REGION_name, COUNTRY_LANGUAGE_id, COUNTRY_id, SEGMENT_LANGUAGE_id, CITY_name, COUNTRY_name, CURRENCY_name, TERRITORY_name, COMPANY_name, RETAILER_name, POSTAL_ZONE_code, RETAILER_TYPE_id, ADDRESS2_address) VALUES (100, '1 (402) 475-4317', 'F03', '2845 South Second Street', '70', '1', 'Golf Shop', '3', '1 (402) 475-4717', 'Nebraska', 'EN', '3', 'EN', 'Lincoln', 'United States', 'dollars', 'Americas', 'Golf Masters', 'Golf Masters', '68538', '1', 'None');

INSERT INTO RETAILER (RETAILER_id, PHONE_phone, FLAG_image, ADDRESS1_address, RETAILER_MR_id, SALES_TERRITORY_id, RETAILER_TYPE_name, SEGMENT_code, FAX_phone, REGION_name, COUNTRY_LANGUAGE_id, COUNTRY_id, SEGMENT_LANGUAGE_id, CITY_name, COUNTRY_name, CURRENCY_name, TERRITORY_name, COMPANY_name, RETAILER_name, POSTAL_ZONE_code, RETAILER_TYPE_id, ADDRESS2_address) VALUES (101, '1 (

## Order ETL

In [45]:
# Merge
order_etl = pd.merge(order_header, order_method, on='ORDER_METHOD_CODE').rename(columns=rename_mapping)

# Exclude redundant foreign key columns
# RETAILER_SITE_code can be derived from RETAILER_CONTACT_id
# SALES_BRANCH_code can be derived from SALES_STAFF_id
order_etl = excludeColumns(order_etl, ['RETAILER_SITE_id', 'SALES_BRANCH_id'])

# Exclude
order_etl = filterColumns(order_etl)

# Assert
sizeCheck(order_etl,7)
order_etl

# Create
createTable(order_etl, 'ORDER_TABLE_id')
insertTable(order_etl, 'ORDER_TABLE_id')

Table has 7 columns
CREATE TABLE ORDER_TABLE (ORDER_TABLE_id INT NOT NULL PRIMARY KEY, ORDER_DATE_date NVARCHAR(30), RETAILER_name NVARCHAR(80), ORDER_METHOD_name NVARCHAR(80), ORDER_METHOD_id INT, RETAILER_CONTACT_id INT, SALES_STAFF_id INT)


## Return reason ETL

In [46]:
# Rename
return_reason_etl = return_reason.rename(columns=rename_mapping)

# Exclude
return_reason_etl = filterColumns(return_reason_etl)

# Assert
sizeCheck(return_reason_etl,2)
return_reason_etl

# Create
createTable(return_reason_etl, 'RETURN_REASON_id')
insertTable(return_reason_etl, 'RETURN_REASON_id')

Table has 2 columns
CREATE TABLE RETURN_REASON (RETURN_REASON_id INT NOT NULL PRIMARY KEY, RETURN_REASON_description NTEXT)


## Returned Item ETL

In [47]:
# Rename 
returned_item_etl = returned_item.rename(columns=rename_mapping)

# Exclude 
returned_item_etl = filterColumns(returned_item_etl)

# Assert
sizeCheck(returned_item_etl,5)
returned_item_etl

# Create
createTable(returned_item_etl, 'RETURNS_id')
insertTable(returned_item_etl, 'RETURNS_id')

Table has 5 columns
CREATE TABLE RETURNS (RETURNS_id INT NOT NULL PRIMARY KEY, RETURN_REASON_id INT, RETURN_DATE_date NVARCHAR(30), ORDER_DETAIL_id INT, RETURN_QUANTITY_number INT)


## Order Details ETL

In [48]:
# Rename
order_detail_etl = order_details.rename(columns=rename_mapping)

# Exclude
order_detail_etl = filterColumns(order_detail_etl)

# Assert
sizeCheck(order_detail_etl,7)
order_detail_etl

# Create
createTable(order_detail_etl, 'ORDER_DETAIL_id')
insertTable(order_detail_etl, 'ORDER_DETAIL_id')

Table has 7 columns
CREATE TABLE ORDER_DETAIL (ORDER_DETAIL_id INT NOT NULL PRIMARY KEY, UNIT_SALE_PRICE_money DECIMAL(19,4), UNIT_PRICE_money DECIMAL(19,4), QUANTITY_number INT, UNIT_COST_money DECIMAL(19,4), ORDER_TABLE_id INT, PRODUCT_id INT)
