## ETL
#### Contributors:
##### Tean 8: Anthony Ung, Sean Jerzewski, Gideon Kipkorir

## 0. Dependencies and Global Variables

In [2]:
import os
from enum import Enum
import csv
import sqlite3 as lite
from decimal import Decimal
from datetime import date, datetime, timedelta

## 1. Gather the file paths
  
  
## IMPORTANT: 
#### Most of these files are untracked on GitHub. it is each team members'   
####   &emsp; &emsp; It is each team members' individual responsibilities  
####   &emsp; &emsp; to build the Database and CSV files for themselves using the other Jupyter notebooks. 

In [8]:
FILE_PATHS = {
    'DB_TEAM_8' : './../12_Run_Transactions/store.db',
    'DATA_MART' : './../01_Source_DBs/Region_C_Data_Mart.db'
}

DB_HANDLES = {}

In [5]:
ALL_FILES_OK = True

for file_key in FILE_PATHS:
    file_name = FILE_PATHS[file_key]
    file_exists = os.path.isfile(file_name)
    
    if(file_exists):
        print(f'OK - {file_key} - \'{file_name}\'')
    else:
        ALL_FILES_OK = False
        print(f'MISSING - {file_key} - \'{file_name}\'')

if not ALL_FILES_OK:
    raise SystemExit('\n' "ERROR!" '\n' "You are missing files!" '\n' "Read and Follow the Cell instructions provided.")

OK - DB_TEAM_8 - './../12_Run_Transactions/store.db'
OK - DATA_MART_PATH - './../01_Source_DBs/Region_C_Data_Mart.db'


## 2. Initialize the Database File and the Database API

I originally made this Database API back in HW 2 and developed it throughout the semester

In [6]:
'''
    If I try to make db_options an inner class to db, 
        I get an error saying that the class is undefined.
'''
class db_options(Enum):
        DEFAULT = 0
        RETURN_RESULTS = 1
        PRINT_RESULTS = 2

class db:
    
    def __init__(self, name):
        self.name = rf"{name}"

    def connect(self):
        self.con = lite.connect(self.name)
        self.cur = self.con.cursor()

    def build_table(self, name):      
        self.execute_sql(f'DROP TABLE IF EXISTS {name}')
        self.execute_sql(TABLE_DEFINITIONS[name])
    
    def execute_sql(self, sql, options=db_options.DEFAULT):
        if (options.value & db_options.RETURN_RESULTS.value):
            results = self.cur.execute(sql).fetchall()
            return results
        elif (options.value & db_options.PRINT_RESULTS.value):
            results = self.cur.execute(sql).fetchall()
            for row in results:
                print(row)
        else:
            self.cur.execute(sql)

    def execute_sql_values(self, sql, values, options=db_options.DEFAULT):
        if (options.value & db_options.RETURN_RESULTS.value):
            results = self.cur.execute(sql, values).fetchall()
            return results
        elif (options.value & db_options.PRINT_RESULTS.value):
            results = self.cur.execute(sql, values).fetchall()
            for row in results:
                print(row)
        else:
            self.cur.execute(sql, values)


    def commit(self):
        self.con.commit()

    def close(self):
        self.con.commit()
        self.con.close()

In [9]:
DB_HANDLES['DB_TEAM_8'] = db(FILE_PATHS['DB_TEAM_8'])
DB_HANDLES['DATA_MART'] = db(FILE_PATHS['DATA_MART'])

## 4. Miscellany

#### Build an auxiliary lookup table in memory
Given a fact table of size `m` and a dimension table of size `n`, I note the following about time and space complexity:
Joins are O(m*n) whereas one lookup per row is O(m). The space requirement changes from O(1) to O(n)

#### Create Utility One-Line Functions
This was done to improve code readability.

#### Known Peculiarity
The old `product` table and the new `products` table coexist at the same time.  
I will keep both becuase I need the base prices.

In [10]:
PRODUCTS_LOOKUP = {}

db_handle = DB_HANDLES['DATA_MART']
db_handle.connect()

sql = 'SELECT sku, ProductKey, CostToStore FROM product'
results = db_handle.execute_sql(sql, options=db_options.RETURN_RESULTS)
for row in results:
    PRODUCTS_LOOKUP[str(row[0])] = {'ProductKey': row[1], 'CostToStore': row[2]}

db_handle.close()

def round_money(amount): return round(amount, 2)
def get_product_cost(sku): return PRODUCTS_LOOKUP[str(sku)]['CostToStore']
def get_case_count(qty): return ((((qty+11)//12)))

## 5. Team 8's ETL

#### I. Build the Data Structures Necessary to ETL from Team 8's Database

In [14]:
DATE_KEYS = {}

def build_data_structures_8():
    start_date = date(2024,1,1)
    end_date = date(2025,6,30)
    current_date = start_date
    
    date_key = 1
    
    while (current_date <= end_date):
        date_str = current_date.strftime('%Y-%m-%d')
        DATE_KEYS[date_str] = date_key
    
        date_key += 1
        current_date += timedelta(days=1)

#### II. Sales

In [15]:
def etl_team_8_sales():
    db_handle_old = DB_HANDLES['DB_TEAM_8']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date, sku, customer_number, COUNT(*), SUM(salesPrice )' \
            'FROM sales_transactions GROUP BY date, customer_number, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_transactions VALUES (?, ?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        DailyCustomerNumber = row[2]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 8
        QuantitySold = round_money(row[3])
        TotalDollarSales = round_money(row[4])
        TotalCostToStore = round_money((row[3] * get_product_cost(row[1])))
        GrossProfit = round((TotalDollarSales - TotalCostToStore), 2)
    
        values = (DateKey, DailyCustomerNumber, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
    
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 1000000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

In [16]:
def etl_team_8_sales_daily():
    db_handle_old = DB_HANDLES['DB_TEAM_8']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date, sku, COUNT(*), SUM(salesPrice )' \
                    'FROM sales_transactions ' \
                    'GROUP BY date, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_daily VALUES (?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 8
        QuantitySold = row[2]
        TotalDollarSales = round_money(row[3])
        TotalCostToStore = round_money((row[2] * get_product_cost(row[1])))
        GrossProfit = round_money((TotalDollarSales - TotalCostToStore))
        
        values = (DateKey, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
        
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 50000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

In [17]:
def etl_team_8_inventory():
    db_handle_old = DB_HANDLES['DB_TEAM_8']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT sku, date, MIN(items_left), MAX(cases_ordered)' \
                    'FROM sales_transactions ' \
                    'GROUP BY date, sku;'

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    print(f'{datetime.now()} - Started Insertions')

    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO inventory_daily VALUES (?, ?, ?, ?, ?, ?, ?)'

    num_records = 0
    for row in results:
        DateKey = DATE_KEYS[row[1]]
        ProductKey = PRODUCTS_LOOKUP[str(row[0])]['ProductKey']
        StoreKey = 8
        NumAvailable = row[2]
        CostToStoreItem = round_money((row[2]*get_product_cost(row[0])))
        CostToStore = round_money(12*get_case_count(row[2])*get_product_cost(row[0]))
        NumCasesPurchasedToDate = row[3]
        
        values = (DateKey, ProductKey, StoreKey, NumAvailable, \
                 CostToStoreItem, CostToStore, NumCasesPurchasedToDate)
    
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 100000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')

    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()


In [18]:
def run_8():
    build_data_structures_8()
    etl_team_8_sales()
    etl_team_8_sales_daily()
    etl_team_8_inventory()

run_8()

2025-05-03 22:16:50.555410 - Started Query
2025-05-03 22:17:01.400757 - Started Insertions
2025-05-03 22:17:04.298727 - Committed record 1000000
2025-05-03 22:17:07.391618 - Committed record 2000000
2025-05-03 22:17:10.334554 - Committed record 3000000
2025-05-03 22:17:13.180986 - Committed record 4000000
2025-05-03 22:17:16.056160 - Committed record 5000000
2025-05-03 22:17:18.933568 - Committed record 6000000
2025-05-03 22:17:21.921528 - Committed record 7000000
2025-05-03 22:17:25.060614 - Committed record 8000000
2025-05-03 22:17:26.923441 - Committed record 8599756
2025-05-03 22:17:27.388762 - Started Query
2025-05-03 22:17:33.635891 - Started Insertions
2025-05-03 22:17:33.788919 - Committed record 50000
2025-05-03 22:17:33.953448 - Committed record 100000
2025-05-03 22:17:34.101648 - Committed record 150000
2025-05-03 22:17:34.245273 - Committed record 200000
2025-05-03 22:17:34.399075 - Committed record 250000
2025-05-03 22:17:34.552598 - Committed record 300000
2025-05-03 22:1

## 6. Generate Quarterly Snapshots

Conceptual hurdles identified
1. We need to do aggregation by quarter, which suggests a JOIN between the `inventory_daily` and the `date` tables
2. We need the last inventory fact for each (Store, Date, Product) tuple  
    &emsp; &emsp; for each tuple's `CasesOnHand` and `CasesPurchasedToDate`  
    &emsp; &emsp; and these are non-additive.  
    &emsp; &emsp; Some (Store, Date, Product) keys may not have an Inventory fact associated with them  
    &emsp; &emsp; because they sold 0 and we need to LEFT-JOIN multiple tables  
    &emsp; &emsp; and do a full table scan of each table at least once for each missing (Store, Date, Product) tuple.  
4. We need to aggregate by quarter to generate the following:  
    &emsp; &emsp; (1) Total costs and Counts sold by the store in the current quarter  
    &emsp; &emsp; (2) Total costs and counts sold by the store YTD.  
    &emsp; &emsp; Generating (2) involves a self-JOIN on already-aggregated data (which warrants the use of a CTE)


In [19]:
def build_date_mapping_tables():
    db_8 = DB_HANDLES['DB_TEAM_8']

    db_handles = [db_8]

    start_date = date(2025, 1, 1)
    end_date = date(2025, 6, 30)

    current_date = start_date

    sql_table_creation = '''
                            CREATE TABLE date(
                                date INT, 
                                quarter INT
                            )
                        '''
    sql_insert = 'INSERT INTO date VALUES (?, ?)'
    
    for db in db_handles:
        db.connect()
        db.execute_sql('DROP TABLE IF EXISTS date')
        db.execute_sql(sql_table_creation)

    while(current_date <= end_date):
        values_fmt_1 = (current_date.strftime('%Y-%m-%d'), ((current_date.month + 2)//3))

        db_8.execute_sql_values(sql_insert, values_fmt_1)
        current_date += timedelta(days=1)

    for db in db_handles:
        db.commit()
        db.close()
    
build_date_mapping_tables()