## Database Build Script v3
## Anthony Ung, Sean Jerzewski, Gideon Kipkorir

Any new modifications to the database will happen in this Notebook.

Modifications to Chelsea Cantone's Code
1. Changed from using Postgres to SQLite3.
2. Created classes of functions to deliniate separate modules.
3. Created classes of products

In [1]:
import sqlite3 as lite
import csv
from datetime import datetime, date, timedelta
from decimal import Decimal
import random

'''
    If you are building the grocery database, 
        you should only touch
        ARGS, TABLE_DEFINITIONS, and params.

    If you get an error message, 
        set all three values in ARGS to True
        and then re-run the script.

    It took about 10 minutes
        on Anthony Ung's Thinkpad P14s
        with Ryzen 7 8840HS, 32GB RAM, and 4TB 990 Pro

    The third arg was usful in a later HW when we did some profiling
'''

ARGS = {
    'Create Products Table': True,
    'Populate Sales Transactions Data': True,
    'Populate Customers Transactions Data': False
}

TABLE_DEFINITIONS = {
    'products': 'CREATE TABLE products(sku INT, product_name TEXT, product_type TEXT, manufacturer TEXT)',
    'transactions_sales': 'CREATE TABLE transactions_sales(date TEXT, customerNumber INT, sku INT, salesPrice REAL)',
    'transactions_customers': 'CREATE TABLE transactions_customers(date VARCHAR(8), customerNumber INT, total FLOAT)'
}


class params:
    
    class group:
        price_multiplier = 1.2
        customers_low = 1020
        customers_high = 1060
        weekend_increase = 75
        maximum_items = 90
        
    class simulation:
        start_date = date(2024, 1, 1)
        end_date = date(2024, 12, 31)
        
    class debug:
        display_daily_commits = 14

        
class db:
    con = None
    cur = None
    commit_pending = 0

    def connect():
        db.con = lite.connect(r'store.db')
        db.cur = db.con.cursor()
        print('Database Successfully Connected To')

    def execute_sql(sql):
        assert type(sql) == str, \
            f"""Error! This function expected a string. 
                Got {print(type(sql))} instead"""
        db.cur.execute(sql)

    def execute_sql_values(sql, values):
        assert type(sql) == str, \
            f"""Error! This function expected a string. 
                Got {print(type(sql))} instead"""
        assert isinstance(values, tuple), \
            f"""Error! This function expected a string. 
                Got {print(type(values))} instead"""
        db.cur.execute(sql, values)

    def commit():
        db.con.commit()
        db.commit_pending = 0

    def close():
        db.con.commit()
        db.con.close()
        print('Database Connection Closed')


class db_debug():
    
    def execute_sql(sql):
        assert isinstance(sql, str), \
            f"""Error! This function expected a string. 
                Got {type(sql)} instead"""

        '''
        In my testing, the db_debug class does not play nicely with
            the db class because even though I invoke db.connect(),
            I still get error messages saying that the database is closed.
        Each invocation creates its own database connection since
            these methods are meant to be used very rarely.
        '''
        con = lite.connect(r'store.db')
        cur = con.cursor()
        
        results = cur.execute(sql).fetchall()
        for row in results:
            print(row)

        con.close()

    def execute_sql_values(sql, values):
        assert type(sql) == str, \
            f"""Error! This function expected a string. 
                Got {print(type(sql))} instead"""
        assert isinstance(values, tuple), \
            f"""Error! This function expected a tuple. 
                Got {print(type(values))} instead"""

        con = lite.connect(r'store.db')
        cur = con.cursor()
        
        results = cur.execute(sql, values)
        for row in results:
            print(row)
        
        con.close()
        

def create_products_table():
    if not ARGS['Create Products Table']:
        print("You don't want to create the Products table")
        return
    db.execute_sql('DROP TABLE IF EXISTS products')
    db.execute_sql(TABLE_DEFINITIONS['products'])
    db.commit()

    csv.register_dialect('piper', delimiter='|', quoting=csv.QUOTE_NONE)
    
    with open('Products1.txt', 'r') as csvfile:
        i = 0
        
        for row in csv.DictReader(csvfile, dialect='piper'):
            sku = row.get('SKU')
            product_name = row.get('Product Name')
            product_type = row.get('itemType')
            manufacturer = row.get('Manufacturer')
            db.execute_sql_values(sql='insert into products values (?, ?, ?, ?)',\
                                    values=(sku, product_name, product_type, manufacturer))
            i += 1
            if i % 10000 == 0:
                db.commit()
                print(f"Committed {i} products")
            
        db.commit()
        print(f"Committed {i} products")

    New_Products.populate_lists()

class Product:
    '''
        This class is called when the product is initially stored into the database.
        The caller is responsible for ensuring that items_left is a multiple of 12.
    '''
    def __init__(self, p_name, p_type, sku, price):
        self.p_name = p_name
        self.p_type = p_type
        self.sku = sku
        self.price = price

    def __str__(self):
        return f'{self.p_name} - {self.p_type} - {self.sku} - {self.price}'


class New_Products:
    from enum import Enum
    import random
    
    products = {}
    products['milk'] = []
    products['cereal'] = []
    products['baby food'] = []
    products['diapers'] = []
    products['bread'] = []
    products['peanut butter'] = []
    products['jelly jam'] = []
    products['other'] = []

    class TYPE(Enum):
        OTHER = 'other'
        MILK = 'milk'
        CEREAL = 'cereal'
        BABY_FOOD = 'baby food'
        DIAPERS = 'diapers'
        BREAD = 'bread'
        PEANUT_BUTTER = 'peanut butter'
        JELLY_JAM = 'jelly jam'

    def select(p_type):
        assert isinstance(p_type, New_Products.TYPE), f'Incorrect type for New_Products.select(). {type(p_type)} received.'
        return random.choice(New_Products.products[p_type.value])
    
    def populate_lists():
        if not ARGS['Create Products Table']:
            print("You don't want to create the Products table")
            return
        
        # Jupyter makes lists persist in memory after I run each cell.
        # I delete the existing lists in order to not have the same product appear multiple times.
        New_Products.products = {}
        New_Products.products['milk'] = []
        New_Products.products['cereal'] = []
        New_Products.products['baby food'] = []
        New_Products.products['diapers'] = []
        New_Products.products['bread'] = []
        New_Products.products['peanut butter'] = []
        New_Products.products['jelly jam'] = []
        New_Products.products['other'] = []

        with open('Products1.txt', 'r') as csvfile:
        
            for row in csv.DictReader(csvfile, dialect='piper'):
                sku = row.get('SKU')
                product_name = row.get('Product Name')
                product_type = row.get('itemType')

                price = row.get('BasePrice')
                price = float(Decimal(price.strip('$')))
                price = price * params.group.price_multiplier
                
                current_product = Product(\
                    p_name = product_name, \
                    p_type = product_type, \
                    sku = sku, 
                    price = price
                )

                
                match product_type:
                    case 'Milk':
                        New_Products.products['milk'].append(current_product)
                    case 'Cereal':
                        New_Products.products['cereal'].append(current_product)
                    case 'Baby Food':
                        New_Products.products['baby food'].append(current_product)
                    case 'Diapers':
                        New_Products.products['diapers'].append(current_product)
                    case 'Bread':
                        New_Products.products['bread'].append(current_product)
                    case 'Peanut Butter':
                        New_Products.products['peanut butter'].append(current_product)
                    case 'Jelly/Jam':
                        New_Products.products['jelly jam'].append(current_product)
                    case _:
                        New_Products.products['other'].append(current_product)
        print('Products in memory successfully populated.')
        

class simulate:
    num_days = 0
    start_date = params.simulation.start_date
    end_date = params.simulation.end_date

    
    def run():
        if not ARGS['Populate Sales Transactions Data']:
            print("You don't want to populate the Sales Transactions table")
            return

        db.execute_sql('DROP TABLE IF EXISTS transactions_sales')
        db.execute_sql(TABLE_DEFINITIONS['transactions_sales'])
        db.commit()

        current_date = simulate.start_date

        while(current_date <= simulate.end_date):
            date_str = current_date.strftime('%Y-%m-%d')
            simulate.simulate_one_day(current_date)
            current_date += timedelta(1)

    
    def simulate_one_day(current_date):
        assert isinstance(current_date, date), \
            f"""Error! This function expected a date. 
                Got {type(current_date)} instead"""

        simulate.num_days += 1        
        increase = 0
        if current_date.weekday() >= 5:
            increase = params.group.weekend_increase
    
        date_str = current_date.strftime('%Y-%m-%d')

        daily_customers = random.randint(params.group.customers_low + increase, params.group.customers_high + increase)

        for customer_number in range(daily_customers):
            simulate.simulate_one_customer(date_str, customer_number + 1)

        if (simulate.num_days % params.debug.display_daily_commits == 0) \
            or (current_date == simulate.start_date) \
            or (current_date == simulate.end_date):
            
            print(f'{datetime.now()} - {date_str} - {db.commit_pending} records created and committing')
        db.commit()
        

    
    def simulate_one_customer(date_str, customer_number):
        assert isinstance(date_str, str), \
            f"""Error! This function expected a date. 
                Got {type(current_date)} instead"""
        assert isinstance(customer_number, int), \
            f"""Error! This function expected a date. 
                Got {type(current_date)} instead"""

        current_num_items = 0
        purchased_num_items = random.randint(1, params.group.maximum_items)

        if random.randint(1, 100) <= 70:
            product = New_Products.select(New_Products.TYPE.MILK)
            SKU = product.sku
            price = product.price
            simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
            current_num_items += 1

            if random.randint(1, 100) <= 50:
                product = New_Products.select(New_Products.TYPE.CEREAL)
                SKU = product.sku
                price = product.price
                simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
                current_num_items += 1

        else:
            if random.randint(1, 100) <= 5:
                product = New_Products.select(New_Products.TYPE.CEREAL)
                SKU = product.sku
                price = product.price
                simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
                current_num_items += 1

        if random.randint(1, 100) <= 20:
            product = New_Products.select(New_Products.TYPE.BABY_FOOD)
            SKU = product.sku
            price = product.price
            simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
            current_num_items += 1

            if random.randint(1, 100) <= 80:
                product = New_Products.select(New_Products.TYPE.DIAPERS)
                SKU = product.sku
                price = product.price
                simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
                current_num_items += 1

        else:
            if random.randint(1, 100) <= 1:
                product = New_Products.select(New_Products.TYPE.DIAPERS)
                SKU = product.sku
                price = product.price
                simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
                current_num_items += 1

        if random.randint(1, 100) <= 50:
            product = New_Products.select(New_Products.TYPE.BREAD)
            SKU = product.sku
            price = product.price
            simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
            current_num_items += 1

        if random.randint(1, 100) <= 10:
            product = New_Products.select(New_Products.TYPE.PEANUT_BUTTER)
            SKU = product.sku
            price = product.price
            simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
            current_num_items += 1

            if random.randint(1, 100) <= 90:
                product = New_Products.select(New_Products.TYPE.JELLY_JAM)
                SKU = product.sku
                price = product.price
                simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
                current_num_items += 1

        else:
            if random.randint(1, 100) <= 5:
                product = New_Products.select(New_Products.TYPE.JELLY_JAM)
                SKU = product.sku
                price = product.price
                simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
                current_num_items += 1

        while current_num_items < purchased_num_items:
            product = New_Products.select(New_Products.TYPE.OTHER)
            SKU = product.sku
            price = product.price
            simulate.writeSalesTransaction(date_str, customer_number, SKU, price)
            current_num_items += 1

    
    def writeSalesTransaction(date: datetime, customerNumber: int, sku: int, salesPrice: float):
        db.commit_pending += 1
        try:
            db.execute_sql_values('insert into transactions_sales values (?, ?, ?, ?)',
                                                          (date,customerNumber,sku,salesPrice))
                
        except Exception as err:
            print("Error writing transactions_sales database table", err)


class customers_transactions_table():    
    num_days = 0
    
    def build():
        if not ARGS['Populate Customers Transactions Data']:
            print("You don't want to populate the Customer transactions table")
            return
            
        db.execute_sql('DROP TABLE IF EXISTS transactions_customers')
        db.execute_sql(TABLE_DEFINITIONS['transactions_customers'])
        db.commit()

        current_date = date(2024, 1, 1)
        end_date = date(2024, 12, 31)

        while(current_date <= end_date):            
            sql = '''
                    WITH filter AS (
                        SELECT *
                        FROM transactions_sales 
                        WHERE date == ?
                    ),
                    customer_sales_data AS (
                        SELECT date, customerNumber, ROUND(SUM(salesPrice),2)
                        FROM filter 
                        GROUP BY customerNumber
                    )
                    SELECT *
                    FROM customer_sales_data
                '''
            num_records = 0
            num_days = 0

            results = db.cur.execute(sql, (current_date,)).fetchall()
            for row in results:
                num_records += 1
                data = (row[0], row[1], row[2])
                db.execute_sql_values('INSERT INTO transactions_customers VALUES (?, ?, ?)', data)
            db.commit()
    
            if (customers_transactions_table.num_days % params.debug.display_daily_commits == 0) \
                or (current_date == simulate.start_date) \
                or (current_date == simulate.end_date):
                
                print(f'{datetime.now()} - {current_date} - {num_records} records committed for customer transactions')
    
            customers_transactions_table.num_days += 1
            current_date += timedelta(1)

        
def run():
    db.connect()
    create_products_table()
    simulate.run()
    customers_transactions_table.build()
    db.close()

run()

Database Successfully Connected To
Committed 2075 products
Products in memory successfully populated.
2025-02-24 11:27:13.157081 - 2024-01-01 - 47909 records created and committing
2025-02-24 11:27:15.061523 - 2024-01-14 - 50177 records created and committing
2025-02-24 11:27:17.267014 - 2024-01-28 - 50368 records created and committing
2025-02-24 11:27:19.705069 - 2024-02-11 - 50107 records created and committing
2025-02-24 11:27:22.376874 - 2024-02-25 - 48723 records created and committing
2025-02-24 11:27:25.157100 - 2024-03-10 - 49143 records created and committing
2025-02-24 11:27:27.867768 - 2024-03-24 - 50720 records created and committing
2025-02-24 11:27:30.516772 - 2024-04-07 - 51508 records created and committing
2025-02-24 11:27:33.182170 - 2024-04-21 - 50147 records created and committing
2025-02-24 11:27:35.859163 - 2024-05-05 - 49479 records created and committing
2025-02-24 11:27:38.798639 - 2024-05-19 - 52429 records created and committing
2025-02-24 11:27:41.598710 - 