## Build Data Mart
#### Contributors:
##### Tean 8: Anthony Ung, Sean Jerzewski, Gideon Kipkorir
##### Team 9: Rohith, Sneha Dasarla
##### Team 10: Anmol Brahmbhatt, Nikita Brahmbhatt, Satya

## 0. Dependencies and Global Variables

In [1]:
import os
from enum import Enum
import csv
import sqlite3 as lite
from decimal import Decimal
from datetime import date, datetime, timedelta

In [2]:
DB_HANDLES = {}

#

## 1. Gather the file paths
  
  
## IMPORTANT: 
#### Most of these files are untracked on GitHub. it is each team members'   
####   &emsp; &emsp; It is each team members' individual responsibilities  
####   &emsp; &emsp; to build the Database and CSV files for themselves using the other Jupyter notebooks. 

In [3]:
FILE_PATHS = {
    'DB_TEAM_8' : './../0_SD_Team_8/store_team_8.db',
    'DB_TEAM_9' : './../0_SD_Team_9/grocery_store.db',
    'DB_TEAM_10' : './../0_SD_Team_10/grocery_team_10_v2.db',
    'PRODUCTS_CSV' : './../2_Product_Mapping/PRODUCTS_MAPPED.csv'
}

DATA_MART_PATH = './Region_C_Data_Mart.db'

In [4]:
ALL_FILES_OK = True

for file_key in FILE_PATHS:
    file_name = FILE_PATHS[file_key]
    file_exists = os.path.isfile(file_name)
    
    if(file_exists):
        print(f'OK - {file_key} - \'{file_name}\'')
    else:
        ALL_FILES_OK = False
        print(f'MISSING - {file_key} - \'{file_name}\'')

if not ALL_FILES_OK:
    raise SystemExit('\n' "ERROR!" '\n' "You are missing files!" '\n' "Read and Follow the Cell instructions provided.")

OK - DB_TEAM_8 - './../0_SD_Team_8/store_team_8.db'
OK - DB_TEAM_9 - './../0_SD_Team_9/grocery_store.db'
OK - DB_TEAM_10 - './../0_SD_Team_10/grocery_team_10_v2.db'
OK - PRODUCTS_CSV - './../2_Product_Mapping/PRODUCTS_MAPPED.csv'


#

## 2. Compile the table definitions

In [5]:
'''
    TABLE_DEFINITIONS is a dict as follows:
        Key - the name of the table in the database
        Value - the CREATE TABLE statement for the table
    I wrote a lot of unused table definitions that will be useful
        in a later HW.
'''
TABLE_DEFINITIONS = {
    'date' : \
            'CREATE TABLE date(' \
                    'DateKey INT, ' \
                    'PrettyDate TEXT, ' \
                    'DayNumberInMonth INT, ' \
                    'DayNumberInYear INT, ' \
                    'WeekNumberInYear INT, ' \
                    'MonthNum INT, ' \
                    'MonthTxt TEXT, ' \
                    'Quarter INT, ' \
                    'Year INT,' \
                    'FiscalYear INT, ' \
                    'isHoliday INT, ' \
                    'isWeekend INT, ' \
                    'Season TEXT' ')',

    'product': \
            'CREATE TABLE product(' \
                    'ProductKey INT,' \
                    'sku INT,' \
                    'product_name TEXT, ' \
                    'product_class_id INT, ' \
                    'subcategory TEXT, ' \
                    'category TEXT, ' \
                    'department TEXT, ' \
                    'product_family TEXT, ' \
                    'size TEXT, ' \
                    'case_count INT, ' \
                    'BrandName TEXT, ' \
                    'Manufacturer TEXT, ' \
                    'Supplier TEXT, ' \
                    'CostToStore REAL)',

    'product_metadata': \
            'CREATE TABLE product_metadata(' \
                    'ProductKey INT,' \
                    'sku INT,' \
                    'old_type TEXT, ' \
                    'meta_code INT,' \
                    'meta_mapped_by TEXT, ' \
                    'meta_reason TEXT)',
    
    'store' : \
            'CREATE TABLE store(' \
                    'StoreKey INT, ' \
                    'StoreManager TEXT, ' \
                    'StoreStreetAddr TEXT, ' \
                    'StoreTown TEXT, ' \
                    'StoreZipCode TEXT, ' \
                    'StorePhoneNumber TEXT, ' \
                    'StoreState TEXT' ')',
    
    'sales_transactions': \
            'CREATE TABLE sales_transactions(' \
                    'DateKey INT, ' \
                    'DailyCustomerNumber INT, ' \
                    'ProductKey INT, ' \
                    'StoreKey INT, ' \
                    'QuantitySold INT, ' \
                    'TotalDollarSales REAL, ' \
                    'TotalCostToStore REAL, ' \
                    'GrossProfit REAL)',

    'sales_daily': \
            'CREATE TABLE sales_daily(' \
                    'DateKey INT, ' \
                    'ProductKey INT, ' \
                    'StoreKey INT, ' \
                    'QuantitySoldToday INT, ' \
                    'CostOfItemsSold REAL, ' \
                    'SalesTotal REAL, ' \
                    'GrossProfit REAL)',

    'inventory_daily' : \
            'CREATE TABLE inventory_daily(' \
                    'DateKey INT, ' \
                    'ProductKey INT, ' \
                    'StoreKey INT, ' \
                    'NumAvailable INT, '
                    'CostToStoreItem FLOAT, ' \
                    'CostToStore FLOAT, ' \
                    'NumCasesPurchasedToDate INT)', 

    'inventory_quarterly' : \
            'CREATE TABLE inventory_quarterly(' \
                    'ProductKey INT, ' \
                    'StoreKey INT, ' \
                    'Quarter INT, ' \
                    'Year INT, ' \
                    'CasesPurchasedToDate INT, ' \
                    'CasesPurchasedThisQuarter INT, ' \
                    'CasesOnHand INT, ' \
                    'TotalCostToStoreThisQuarter FLOAT, ' \
                    'TotalSoldByStoreThisQuarter FLOAT, ' \
                    'TotalCostToStoreThisYTD FLOAT, ' \
                    'TotalSoldByStoreThisYTD FLOAT)'
}


#

## 3. Initialize the Database File and the Database API
#### Note: The first cell in this block is destructive.
#### If you need to see multiple versions of the database side-by-side, rename the db file before rerunning this notebook.

In [6]:
if os.path.isfile(DATA_MART_PATH):
    os.remove(DATA_MART_PATH)

In [7]:
'''
    This class provides one common point of interaction with my team's database.
    Everything that writes to the database uses this API.
'''
class db_options(Enum):
        DEFAULT = 0
        RETURN_RESULTS = 1
        PRINT_RESULTS = 2

class db:
    
    def __init__(self, name):
        self.name = rf"{name}"

    def connect(self):
        self.con = lite.connect(self.name)
        self.cur = self.con.cursor()

    def build_table(self, name):      
        self.execute_sql(f'DROP TABLE IF EXISTS {name}')
        self.execute_sql(TABLE_DEFINITIONS[name])
    
    def execute_sql(self, sql, options=db_options.DEFAULT):
        if (options.value & db_options.RETURN_RESULTS.value):
            results = self.cur.execute(sql).fetchall()
            return results
        elif (options.value & db_options.PRINT_RESULTS.value):
            results = self.cur.execute(sql).fetchall()
            for row in results:
                print(row)
        else:
            self.cur.execute(sql)

    def execute_sql_values(self, sql, values, options=db_options.DEFAULT):
        if (options.value & db_options.RETURN_RESULTS.value):
            results = self.cur.execute(sql, values).fetchall()
            return results
        elif (options.value & db_options.PRINT_RESULTS.value):
            results = self.cur.execute(sql, values).fetchall()
            for row in results:
                print(row)
        else:
            self.cur.execute(sql, values)


    def commit(self):
        self.con.commit()

    def close(self):
        self.con.commit()
        self.con.close()

In [8]:
DB_HANDLES['DB_TEAM_8'] = db(FILE_PATHS['DB_TEAM_8'])
DB_HANDLES['DB_TEAM_9'] = db(FILE_PATHS['DB_TEAM_9'])
DB_HANDLES['DB_TEAM_10'] = db(FILE_PATHS['DB_TEAM_10'])
DB_HANDLES['DATA_MART'] = db(DATA_MART_PATH)

#

## 4. Build the Dimension Tables

#### Product Dimension
The presence of the CSV generated by the script is checked earlier.

In [9]:
def build_product_table():
    db_handle = DB_HANDLES['DATA_MART']
    
    with open(FILE_PATHS['PRODUCTS_CSV'], 'r') as csvfile:
        db_handle.connect()

        db_handle.build_table('product')
        db_handle.build_table('product_metadata')
        
        for row in csv.DictReader(csvfile):
            product_key = row['product_id']
            sku = row['SKU']
            product_name = row['Product Name']
            product_class_id = row['product_class_id']
            product_subcategory = row['product_subcategory']
            product_category = row['product_category']
            product_department = row['product_department']
            product_family = row['product_family']
            size = row['Size']
            case_count = 12
            brand_name = row['product_subcategory']
            manufacturer = row['Manufacturer']
            supplier = row['Supplier']
            cost_to_store = round(float(Decimal(row['BasePrice'].strip('$'))),2)


            old_type = row['itemType']
            meta_code = row['meta_code']
            meta_mapped_by = row['meta_mapped_by']
            meta_reason = row['meta_reason']

            db_handle.execute_sql_values(sql='insert into product values \
                                    (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', \
                                 values=(product_key, sku, product_name, \
                                        product_class_id, product_subcategory, product_category, product_department, product_family, \
                                        size, case_count,
                                        brand_name, manufacturer, supplier, cost_to_store))

            db_handle.execute_sql_values(sql='insert into product_metadata values \
                                    (?, ?, ?, ?, ?, ?)', \
                                    values=(product_key, sku, old_type, meta_code, meta_mapped_by, meta_reason))
        
        
        print('Product and Product Metadata Tables Populated')
        db_handle.commit()
        db_handle.close()

build_product_table()

Product and Product Metadata Tables Populated


#### Store Dimension
Code originally written by Gideon Kipkorir

In [10]:
data = [
    {
        "StoreKey": 8,
        "StoreManager": "Anthony-Sean-Gideon",
        "StoreStreetAddr": "1180 Seven Seas Dr",
        "StoreTown": "Orlando",
        "StoreZipCode": "32836",
        "StorePhone#": "(407) 824-4500",
        "StoreState": "FL"
    },
    {
        "StoreKey": 9,
        "StoreManager": "Rohith-Sneha",
        "StoreStreetAddr": "201 Mullica Hill Road",
        "StoreTown": "Glassboro",
        "StoreZipCode": "08028",
        "StorePhone#": "(856) 424-2222 x2500",
        "StoreState": "NJ"
    },
    {
        "StoreKey": 10,
        "StoreManager": "Anmol-Nikita-Satya",
        "StoreStreetAddr": "620 Anthony Ung Drive",
        "StoreTown": "Miami",
        "StoreZipCode": "33130",
        "StorePhone#": "(856) 663-8006",
        "StoreState": "FL"
    }
]

def build_store_dimension():
    db_handle = DB_HANDLES['DATA_MART']
    db_handle.connect()
    db_handle.build_table('store')

    for store in data:
        db_handle.execute_sql_values(sql='insert into store values \
                                    (?, ?, ?, ?, ?, ?, ?)', \
                                    values=(store['StoreKey'], \
                                            store['StoreManager'], \
                                            store['StoreStreetAddr'], \
                                            store['StoreTown'], \
                                            store['StoreZipCode'], \
                                            store['StorePhone#'], \
                                            store['StoreState']))
    
    db_handle.commit()
    db_handle.close()
    print('Store Dimension Successfully Built')
    

build_store_dimension()

Store Dimension Successfully Built


#### Date Dimension
Logic originally written by Sean Jerzewski

In [11]:
def build_date_dimension():
    db_handle = DB_HANDLES['DATA_MART']
    db_handle.connect()
    db_handle.build_table('date')

    start_date = date(2024,1,1)
    end_date = date(2024,12,31)
    
    current_date = start_date
    day_number = 1

    holidays = ["2024-01-01", \
                "2024-01-15", \
                "2024-02-19", \
                "2024-03-29", \
                "2024-05-27", \
                "2024-06-21", \
                "2024-07-04", \
                "2024-09-02", \
                "2024-10-14", \
                "2024-11-05", \
                "2024-11-11", \
                "2024-11-28", \
                "2024-12-25"]
    
    spring = date(2024,3,21)
    summer = date(2024,6,21)
    fall = date(2024,9,21)
    winter = date(2024,12,21)

    while (current_date <= end_date):
        DateKey = day_number
        PrettyDate = current_date.strftime('%Y-%m-%d')
        DayNumberInMonth = current_date.strftime('%d')
        DayNumberInYear = day_number
        WeekNumberInYear = current_date.strftime('%W')
        MonthNum = current_date.strftime('%m')
        MonthTxt = current_date.strftime('%B')
        Quarter = (int(MonthNum) + 2) // 3
        Year = current_date.year
        FiscalYear = 2023 if current_date.month < 8 else 2024
        isHoliday = 'True' if current_date.strftime('%Y-%m-%d') in holidays else 'False'

        # 'False' is more typical than True
        isWeekend = 'False' if current_date.weekday() < 5 else 'True'

        if spring <= current_date < summer:
            season = "Spring"
        elif summer <= current_date < fall:
            season = "Summer"
        elif fall <= current_date < winter:
            season = "Fall"
        else:
            season = "Winter"

        db_handle.execute_sql_values(sql='insert into date values \
                                    (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', \
                                    values=(DateKey, \
                                            PrettyDate, \
                                            DayNumberInMonth, \
                                            DayNumberInYear, \
                                            WeekNumberInYear, \
                                            MonthNum, \
                                            MonthTxt, \
                                            Quarter, \
                                            Year, \
                                            FiscalYear, \
                                            isHoliday, \
                                            isWeekend, \
                                            season))
        
        day_number += 1
        current_date += timedelta(days=1)

    db_handle.commit()
    db_handle.close()
    print('Date Dimension Successfully Built')

build_date_dimension()

Date Dimension Successfully Built


#

## 5. Build the tables

#### Create the table

In [12]:
db_handle = DB_HANDLES['DATA_MART']
db_handle.connect()
db_handle.build_table('sales_transactions')
db_handle.build_table('inventory_daily')
db_handle.build_table('sales_daily')
db_handle.build_table('inventory_quarterly')
db_handle.commit()
db_handle.close()


#### Build an auxiliary lookup table in memory
Joins are O(m*n) whereas one lookup per row is O(m). The space requirement changes from O(1) to O(n)

In [31]:
PRODUCTS_LOOKUP = {}

db_handle = DB_HANDLES['DATA_MART']
db_handle.connect()

sql = 'SELECT sku, ProductKey, CostToStore FROM product'
results = db_handle.execute_sql(sql, options=db_options.RETURN_RESULTS)
for row in results:
    PRODUCTS_LOOKUP[str(row[0])] = {'ProductKey': row[1], 'CostToStore': row[2]}

db_handle.close()

#### Create Utility Lambdas
This was done to improve code readability.

In [14]:
def round_money(amount): return round(amount, 2)
def get_product_cost(sku): return PRODUCTS_LOOKUP[str(sku)]['CostToStore']
def get_case_count(qty): return ((((row[2]+11)//12)*12))

#

## 6. Team 8's ETL

#### I. Build the Data Structures Necessary to ETL from Team 8's Database

In [15]:
DATE_KEYS = {}

def build_data_structures_8():
    start_date = date(2024,1,1)
    end_date = date(2024,12,31)
    current_date = start_date
    
    date_key = 1
    
    while (current_date <= end_date):
        date_str = current_date.strftime('%Y-%m-%d')
        DATE_KEYS[date_str] = date_key
    
        date_key += 1
        current_date += timedelta(days=1)

#### II. Sales

In [17]:
def etl_team_8_sales():
    db_handle_old = DB_HANDLES['DB_TEAM_8']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date, sku, customer_number, COUNT(*), SUM(salesPrice )' \
            'FROM sales_transactions GROUP BY date, customer_number, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_transactions VALUES (?, ?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        DailyCustomerNumber = row[2]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 8
        QuantitySold = round_money(row[3])
        TotalDollarSales = round_money(row[4])
        TotalCostToStore = round_money((row[3] * get_product_cost(row[1])))
        GrossProfit = round((TotalDollarSales - TotalCostToStore), 2)
    
        values = (DateKey, DailyCustomerNumber, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
    
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 1000000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

#### III. Roll Sales Up

In [19]:
def etl_team_8_sales_daily():
    db_handle_old = DB_HANDLES['DB_TEAM_8']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date, sku, COUNT(*), SUM(salesPrice )' \
                    'FROM sales_transactions ' \
                    'GROUP BY date, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_daily VALUES (?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 8
        QuantitySold = row[2]
        TotalDollarSales = round_money(row[3])
        TotalCostToStore = round_money((row[2] * get_product_cost(row[1])))
        GrossProfit = round_money((TotalDollarSales - TotalCostToStore))
        
        values = (DateKey, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
        
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 50000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

#### IV. Inventory

In [23]:
def etl_team_8_inventory():
    db_handle_old = DB_HANDLES['DB_TEAM_8']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT sku, date, MIN(items_left), MAX(cases_ordered)' \
                    'FROM sales_transactions ' \
                    'GROUP BY date, sku;'

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    print(f'{datetime.now()} - Started Insertions')

    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO inventory_daily VALUES (?, ?, ?, ?, ?, ?, ?)'

    num_records = 0
    for row in results:
        DateKey = DATE_KEYS[row[1]]
        ProductKey = PRODUCTS_LOOKUP[str(row[0])]['ProductKey']
        StoreKey = 8
        NumAvailable = row[2]
        CostToStoreItem = round_money((row[2]*get_product_cost(row[0])))
        CostToStore = round_money(get_case_count(row[2])*get_product_cost(row[0]))
        NumCasesPurchasedToDate = row[3]
        
        values = (DateKey, ProductKey, StoreKey, NumAvailable, \
                 CostToStoreItem, CostToStore, NumCasesPurchasedToDate)
    
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 100000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')

    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()


#### V. Run
Comment out the call to `run_8()` to verify the functionality for other ETLs.

In [24]:
def run_8():
    build_data_structures_8()
    etl_team_8_sales()
    etl_team_8_sales_daily()
    etl_team_8_inventory()

run_8()

2025-03-26 11:17:14.815845 - Started Query
2025-03-26 11:17:39.900069 - Started Insertions
2025-03-26 11:17:43.245875 - Committed record 1000000
2025-03-26 11:17:46.609100 - Committed record 2000000
2025-03-26 11:17:50.084940 - Committed record 3000000
2025-03-26 11:17:53.517059 - Committed record 4000000
2025-03-26 11:17:56.901990 - Committed record 5000000
2025-03-26 11:18:00.246070 - Committed record 6000000
2025-03-26 11:18:03.642544 - Committed record 7000000
2025-03-26 11:18:07.009823 - Committed record 8000000
2025-03-26 11:18:10.366536 - Committed record 9000000
2025-03-26 11:18:13.731551 - Committed record 10000000
2025-03-26 11:18:17.089201 - Committed record 11000000
2025-03-26 11:18:20.449657 - Committed record 12000000
2025-03-26 11:18:23.831899 - Committed record 13000000
2025-03-26 11:18:27.231002 - Committed record 14000000
2025-03-26 11:18:30.657679 - Committed record 15000000
2025-03-26 11:18:34.054169 - Committed record 16000000
2025-03-26 11:18:37.509044 - Committed

#

## 7. Team 9's ETL

#### I. Build the Data Structures Necessary to ETL from Team 8's Database

In [25]:
DATE_KEYS = {}

def build_data_structures_9():
    start_date = date(2024,1,1)
    end_date = date(2024,12,31)
    current_date = start_date
    
    date_key = 1
    
    while (current_date <= end_date):
        date_str = current_date.strftime('%Y-%m-%d')
        DATE_KEYS[date_str] = date_key
    
        date_key += 1
        current_date += timedelta(days=1)

#### II. Sales

In [27]:
def etl_team_9_sales():
    db_handle_old = DB_HANDLES['DB_TEAM_9']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date1, sku, customerID , COUNT(*), SUM(salePrice) ' \
                    'FROM transactions ' \
                    'GROUP BY date1, customerID , sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_transactions VALUES (?, ?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)

    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        DailyCustomerNumber = row[2]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 9
        QuantitySold = row[3]
        TotalDollarSales = row[4]
        TotalCostToStore = round_money(row[3] * PRODUCTS_LOOKUP[str(row[1])]['CostToStore'])
        GrossProfit = round_money((TotalDollarSales - TotalCostToStore))

        values = (DateKey, DailyCustomerNumber, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
        
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 1000000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')

    print(f'{datetime.now()} - Committed record {num_records}')

    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()


#### III. Roll Sales Up

In [28]:
def etl_team_9_sales_daily():
    db_handle_old = DB_HANDLES['DB_TEAM_9']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date1, sku, COUNT(*), SUM(salePrice) ' \
                    'FROM transactions ' \
                    'GROUP BY date1, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_daily VALUES (?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 9
        QuantitySold = row[2]
        TotalDollarSales = round_money(row[3])
        TotalCostToStore = round_money((row[2] * PRODUCTS_LOOKUP[str(row[1])]['CostToStore']))
        GrossProfit = round_money((TotalDollarSales - TotalCostToStore))
        
        values = (DateKey, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
        
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 50000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

#### IV. Inventory

In [29]:
def etl_team_9_inventory():
    db_handle_old = DB_HANDLES['DB_TEAM_9']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT sku, date1, MIN(itemsLeft), MAX(co)' \
                    'FROM transactions ' \
                    'GROUP BY date1, sku;'

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    print(f'{datetime.now()} - Started Insertions')

    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO inventory_daily VALUES (?, ?, ?, ?, ?, ?, ?)'

    num_records = 0
    for row in results:
        DateKey = DATE_KEYS[row[1]]
        ProductKey = PRODUCTS_LOOKUP[str(row[0])]['ProductKey']
        StoreKey = 8
        NumAvailable = row[2]
        CostToStoreItem = round_money((row[2]*get_product_cost(row[0])))
        CostToStore = round_money(get_case_count(row[2])*get_product_cost(row[0]))
        NumCasesPurchasedToDate = row[3]
        
        values = (DateKey, ProductKey, StoreKey, NumAvailable, \
                 CostToStoreItem, CostToStore, NumCasesPurchasedToDate)

        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 100000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')

    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()


In [32]:
def run_9():
    build_data_structures_9()
    etl_team_9_sales()
    etl_team_9_sales_daily()
    etl_team_9_inventory()

run_9()

2025-03-26 11:25:04.481034 - Started Query
2025-03-26 11:25:24.425600 - Started Insertions
2025-03-26 11:25:27.447109 - Committed record 1000000
2025-03-26 11:25:30.481111 - Committed record 2000000
2025-03-26 11:25:33.470169 - Committed record 3000000
2025-03-26 11:25:36.486310 - Committed record 4000000
2025-03-26 11:25:39.530066 - Committed record 5000000
2025-03-26 11:25:42.553532 - Committed record 6000000
2025-03-26 11:25:45.555052 - Committed record 7000000
2025-03-26 11:25:48.570842 - Committed record 8000000
2025-03-26 11:25:51.590344 - Committed record 9000000
2025-03-26 11:25:54.614092 - Committed record 10000000
2025-03-26 11:25:57.616774 - Committed record 11000000
2025-03-26 11:26:00.676620 - Committed record 12000000
2025-03-26 11:26:03.705338 - Committed record 13000000
2025-03-26 11:26:06.306757 - Committed record 13854769
2025-03-26 11:26:07.064465 - Started Query
2025-03-26 11:26:17.936314 - Started Insertions
2025-03-26 11:26:18.123656 - Committed record 50000
2025-

#

## 8. Team 10's ETL

#### I. Build the data structures necessary for Team 10's ETL

In [33]:
DATE_KEYS = {}

def build_data_structures_10():
    start_date = date(2024,1,1)
    end_date = date(2024,12,31)
    current_date = start_date
    
    date_key = 1
    
    while (current_date <= end_date):
        date_str = current_date.strftime('%Y%m%d')
        DATE_KEYS[date_str] = date_key
    
        date_key += 1
        current_date += timedelta(days=1)

#### II. Sales

In [34]:
def etl_team_10_sales():
    db_handle_old = DB_HANDLES['DB_TEAM_10']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date, sku, customer_number, COUNT(*), SUM(salesPrice )' \
            'FROM sales_transactions GROUP BY date, customer_number, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_transactions VALUES (?, ?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        DailyCustomerNumber = row[2]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 10
        QuantitySold = round_money(row[3])
        TotalDollarSales = round_money(row[4])
        TotalCostToStore = round_money((row[3] * get_product_cost(row[1])))
        GrossProfit = round((TotalDollarSales - TotalCostToStore), 2)
    
        values = (DateKey, DailyCustomerNumber, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
    
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 1000000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

#### III. Roll Sales Up

In [35]:
def etl_team_10_sales_daily():
    db_handle_old = DB_HANDLES['DB_TEAM_10']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT date, sku, COUNT(*), SUM(salesPrice )' \
                    'FROM sales_transactions ' \
                    'GROUP BY date, sku'
    
    
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO sales_daily VALUES (?, ?, ?, ?, ?, ?, ?)'
    
    num_records = 0

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    
    print(f'{datetime.now()} - Started Insertions')
    for row in results:
        DateKey = DATE_KEYS[row[0]]
        ProductKey = PRODUCTS_LOOKUP[str(row[1])]['ProductKey']
        StoreKey = 10
        QuantitySold = row[2]
        TotalDollarSales = round_money(row[3])
        TotalCostToStore = round_money((row[2] * get_product_cost(row[1])))
        GrossProfit = round_money((TotalDollarSales - TotalCostToStore))
        
        values = (DateKey, ProductKey, StoreKey, \
                 QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
        
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 50000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')
    
    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()

#### IV. Inventory

In [36]:
def etl_team_10_inventory():
    db_handle_old = DB_HANDLES['DB_TEAM_10']
    db_handle_old.connect()
    
    sql_retrieve = 'SELECT sku, date, MIN(items_left), MAX(cases_ordered)' \
                    'FROM sales_transactions ' \
                    'GROUP BY date, sku;'

    print(f'{datetime.now()} - Started Query')
    results = db_handle_old.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    print(f'{datetime.now()} - Started Insertions')

    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()
    
    sql_insert = 'INSERT INTO inventory_daily VALUES (?, ?, ?, ?, ?, ?, ?)'

    num_records = 0
    for row in results:
        DateKey = DATE_KEYS[row[1]]
        ProductKey = PRODUCTS_LOOKUP[str(row[0])]['ProductKey']
        StoreKey = 10
        NumAvailable = row[2]
        CostToStoreItem = round_money((row[2]*get_product_cost(row[0])))
        CostToStore = round_money(get_case_count(row[2])*get_product_cost(row[0]))
        NumCasesPurchasedToDate = row[3]
        
        values = (DateKey, ProductKey, StoreKey, NumAvailable, \
                 CostToStoreItem, CostToStore, NumCasesPurchasedToDate)
    
        num_records += 1
        db_handle_new.execute_sql_values(sql_insert, values=values)
    
        if(num_records % 100000 == 0):
            db_handle_new.commit()
            print(f'{datetime.now()} - Committed record {num_records}')

    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()
    
    db_handle_old.close()


In [38]:
def run_10():
    #build_data_structures_10()
    #etl_team_10_sales()
    etl_team_10_sales_daily()
    #etl_team_10_inventory()

run_10()

2025-03-26 11:59:02.834330 - Started Query
2025-03-26 11:59:13.054409 - Started Insertions
2025-03-26 11:59:13.249003 - Committed record 50000
2025-03-26 11:59:13.435001 - Committed record 100000
2025-03-26 11:59:13.621820 - Committed record 150000
2025-03-26 11:59:13.816015 - Committed record 200000
2025-03-26 11:59:14.001534 - Committed record 250000
2025-03-26 11:59:14.213034 - Committed record 300000
2025-03-26 11:59:14.403708 - Committed record 350000
2025-03-26 11:59:14.568290 - Committed record 400000
2025-03-26 11:59:14.773079 - Committed record 450000
2025-03-26 11:59:14.959373 - Committed record 500000
2025-03-26 11:59:15.156369 - Committed record 550000
2025-03-26 11:59:15.339051 - Committed record 600000
2025-03-26 11:59:15.531977 - Committed record 650000
2025-03-26 11:59:15.723281 - Committed record 700000
2025-03-26 11:59:15.862112 - Committed record 739749


#

In [None]:
def roll_up_sales_transactions():
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()

    db_handle_new.build_table('sales_daily')
    
    sql_retrieve = \
                'SELECT DateKey, ProductKey, StoreKey, SUM(QuantitySold), SUM(TotalDollarSales), SUM(TotalCostToStore), SUM(GrossProfit) ' \
                'FROM sales_transactions ' \
                'GROUP BY StoreKey, ProductKey, DateKey '\
                'ORDER BY 1, 2, 3;'

    sql_insert = 'INSERT INTO sales_daily VALUES (?, ?, ?, ?, ?, ?, ?)'

    print(f'{datetime.now()} - Started Query')
    results = db_handle_new.execute_sql(sql_retrieve, options=db_options.RETURN_RESULTS)
    print(f'{datetime.now()} - Started Insertions')

    num_records = 0
    
    for row in results:
        DateKey = row[0]
        ProductKey = row[1]
        StoreKey = row[2]
        QuantitySold = round(row[3], 2)
        TotalDollarSales = round(row[4], 2)
        TotalCostToStore = round(row[5], 2)
        GrossProfit = round(row[6], 2)

        values = (DateKey, ProductKey, StoreKey, QuantitySold, TotalDollarSales, TotalCostToStore, GrossProfit)
        
        db_handle_new.execute_sql_values(sql_insert, values=values)

        num_records += 1
        if(num_records % 100000 == 0):
            db_handle.commit()
            print(f'{datetime.now()} - Committed record {num_records}')

    print(f'{datetime.now()} - Committed record {num_records}')
    db_handle_new.commit()
    db_handle_new.close()

    
roll_up_sales_transactions()


#

## 10. Roll Up Inventory From Inventory Daily to Quarterly Snapshots

#### I. Fill in gaps (i.e. missing Date-Store-Product tuples)
This takes about an hour for 3 databases but there are no ways to optimize.

In [None]:
def fill_gaps():
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()

    sql_find_gaps = 'WITH date_store_product_tuples AS (' \
                        'SELECT StoreKey, DateKey, ProductKey ' \
                        'FROM date ' \
                            'CROSS JOIN product ' \
                            'CROSS JOIN store ' \
                        'WHERE StoreKey = 8' \
                    ') ' \
                    'SELECT * ' \
                    'FROM date_store_product_tuples AS d ' \
                    'LEFT JOIN inventory_daily AS i ON ' \
                        '((d.StoreKey = i.StoreKey) AND (d.DateKey = i.DateKey) AND (d.ProductKey = i.ProductKey))' \
                    'WHERE ((i.StoreKey IS NULL) AND (i.DateKey IS NULL) AND (i.ProductKey IS NULL)) '\
                    'ORDER BY d.StoreKey, d.DateKey, d.ProductKey';

    sql_find_prev = 'SELECT * FROM inventory_daily WHERE (StoreKey = ? AND DateKey = ? AND ProductKey = ?)'
    
    sql_insert = 'INSERT INTO inventory_daily VALUES (?, ?, ?, ?, ?, ?, ?)'

    print(f'{datetime.now()} - Started Query')
    results = db_handle_new.execute_sql(sql_find_gaps, options=db_options.RETURN_RESULTS)
    print(f'{datetime.now()} - Found {len(results)} Gaps')

    '''
        For each gap, go back to the previous day and copy the values.
        Since they are ordered, I may assume there exists an entry from the previous day.
        I allow NULLS for 2024-01-01 and any consecutive days where a given product was not sold.
    '''
    for row in results:
        store_key = row[0]
        date_key = row[1]
        product_key = row[2]

        values = (store_key, (date_key - 1), product_key)
        
        result = db_handle_new.execute_sql_values(sql_find_prev, values=values, options=db_options.RETURN_RESULTS)

        if (len(result) != 0):
            row = result[0]
            values = (date_key, row[1], row[2], row[3], row[4], row[5], row[6])
            db_handle_new.execute_sql_values(sql_insert, values)

    print(f'{datetime.now()} - Filled gaps')

    db_handle_new.commit()
    db_handle_new.close()

fill_gaps()

#### II. Build Quarterly Snapshots

    'inventory_daily' : \
            'CREATE TABLE inventory_daily(' \
                    'DateKey INT, ' \
                    'ProductKey INT, ' \
                    'StoreKey INT, ' \
                    'NumAvailable INT, '
                    'CostToStoreItem FLOAT, ' \
                    'CostToStore FLOAT, ' \
                    'NumCasesPurchasedToDate INT)', 

    'inventory_quarterly' : \
            'CREATE TABLE inventory_quarterly(' \
                    'ProductKey INT, ' \
                    'StoreKey INT, ' \
                    'Quarter INT, ' \
                    'Year INT, ' \
                    'CasesPurchasedToDate INT, ' \
                    'CasesPurchasedThisQuarter INT, ' \
                    'CasesOnHand INT, ' \
                    'TotalCostToStoreThisQuarter FLOAT, ' \
                    'TotalSoldByStoreThisQuarter FLOAT, ' \
                    'TotalCostToStoreThisYTD FLOAT, ' \
                    'TotalSoldByStoreThisYTD FLOAT)'

In [None]:
def build_quarterly_snapshots():
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()

    sql_selected_inventory =    'WITH selected_dates AS (' \
                                    'SELECT DateKey, Quarter, Year ' \
                                    'FROM date ' \
                                    'WHERE PrettyDate IN (\'2024-03-31\', \'2024-06-30\', \'2024-09-30\', \'2024-12-31\') ' \
                                '), selected_snapshots AS (' \
                                    'SELECT * ' \
                                    'FROM inventory_daily '\
                                    'JOIN selected_dates USING (DateKey) ' \
                                '), '\
                                '''
                                    sales_ytd AS(
                                        WITH calc1 AS (
                                            SELECT 
                                                StoreKey, 
                                                Quarter, 
                                                ProductKey, 
                                                SUM(CostOfItemsSold) AS cost, 
                                                SUM(QuantitySoldToday) AS count
                                            FROM sales_daily
                                            JOIN date USING (DateKey)
                                            GROUP BY StoreKey, Quarter, ProductKey
                                        ), 
                                        ytd_calc AS (
                                            SELECT
                                                c1.StoreKey AS StoreKey,
                                                c1.ProductKey AS ProductKey,
                                                c1.Quarter AS Quarter,
                                                c1.cost,
                                                c1.count, 
                                                SUM(c2.cost) AS YTD_CostOfItemsSold,
                                                SUM(c2.count) AS YTD_QuantitySold
                                            FROM calc1 c1
                                            JOIN calc1 c2 ON c1.StoreKey = c2.StoreKey AND c1.ProductKey = c2.ProductKey AND c2.Quarter <= c1.Quarter
                                            GROUP BY c1.StoreKey, c1.ProductKey, c1.Quarter
                                        )
                                        SELECT * FROM ytd_calc
                                    ) 
                                '''\
                                '''SELECT * 
                                    FROM selected_snapshots AS ss
                                    JOIN sales_ytd AS sy ON ((ss.StoreKey = sy.StoreKey) AND (ss.Quarter = sy.Quarter) AND (ss.ProductKey = sy.ProductKey));'''
    
    print(f'{datetime.now()} - Started Query')
    results = db_handle_new.execute_sql(sql_selected_inventory, options=db_options.RETURN_RESULTS)
    
    for row in results:
        ProductKey = row[1]
        StoreKey = row[2]
        QuarterAndYear = f'Q{row[7]} {row[8]}'
        Quarter = row[7]
        Year = row[8]
        CasesPurchasedToDate = row[6]
        CasesPurchasedThisQuarter = 0 #TODO
        CasesOnHand = (row[3] // 12)
        TotalCostToStoreThisQuarter = round(row[12], 2)
        TotalSoldByStoreThisQuarter = row[13]
        TotalCostToStoreThisYTD = round(row[14], 2)
        TotalSoldByStoreThisYTD = row[15]

        values = (
            ProductKey,
            StoreKey,
            QuarterAndYear,
            Quarter,
            CasesPurchasedToDate,
            CasesPurchasedThisQuarter,
            CasesOnHand,
            TotalCostToStoreThisQuarter,
            TotalSoldByStoreThisQuarter,
            TotalCostToStoreThisYTD, 
            TotalSoldByStoreThisYTD
        )
        
        print(values)
    
    db_handle_new.commit()
    db_handle_new.close()


build_quarterly_snapshots()

In [None]:
def testbed():
    db_handle_new = DB_HANDLES['DATA_MART']
    db_handle_new.connect()

    sql_selected_inventory =    '''
                                    WITH sales_ytd AS(
                                        WITH calc1 AS (
                                            SELECT 
                                                StoreKey, 
                                                Quarter, 
                                                ProductKey, 
                                                SUM(CostOfItemsSold) AS cost, 
                                                SUM(QuantitySoldToday) AS count
                                            FROM sales_daily
                                            JOIN date USING (DateKey)
                                            GROUP BY StoreKey, Quarter, ProductKey
                                        ), 
                                        ytd_calc AS (
                                            SELECT
                                                c1.StoreKey AS StoreKey,
                                                c1.ProductKey AS ProductKey,
                                                c1.Quarter AS Quarter,
                                                c1.cost,
                                                c1.count, 
                                                SUM(c2.cost) AS YTD_CostOfItemsSold,
                                                SUM(c2.count) AS YTD_QuantitySold
                                            FROM calc1 c1
                                            JOIN calc1 c2 ON c1.StoreKey = c2.StoreKey AND c1.ProductKey = c2.ProductKey AND c2.Quarter <= c1.Quarter
                                            GROUP BY c1.StoreKey, c1.ProductKey, c1.Quarter
                                        )
                                        SELECT * FROM ytd_calc
                                    ) SELECT * FROM sales_ytd
                                '''
    
    print(f'{datetime.now()} - Started Query')
    results = db_handle_new.execute_sql(sql_selected_inventory, options=db_options.RETURN_RESULTS)
    
    for row in results:
        '''
        ProductKey = row[1]
        StoreKey = row[2]
        QuarterAndYear = f'Q{(row[7] / 3)} 2024'
        Quarter = (row[7] / 3)
        TotalCostToStoreThisQuarter
        TotalSoldByStoreThisQuarter
        TotalCostToStoreThisYTD
        TotalSoldByStoreThisYTD
        '''
        
        print(row)
    
    db_handle_new.commit()
    db_handle_new.close()


testbed()