# Imports

In [1]:
# imports

import psycopg2
import random
from psycopg2 import OperationalError
from faker import Faker
import os
from dotenv import load_dotenv

KEY TABLES

- [x] customer.customers
- [x] inventory.products
- [x] inventory.suppliers
- [x] mapping.states
- [x] employees.departments
- [x] mapping.payment_methods

SECONDARY TABLES

- [x] mapping.cities
- [x] customers.customer_addresses
- [x] inventory.shipments
- [x] employees.employees
- [x] sales.orders
- [x] sales.order_details
- [x] sales.payments



# Functions

## 1. Primary Tables

**1.1 customers.customers**

cols: customer_id, first_name, last_name, email, phone

In [None]:
def generate_customers(cursor, num_customers=50):

    print("Generating customers.customers data...")
    
    # Generate data
    fake = Faker()
    customers = []

    for _ in range(num_customers):
        first_name = fake.first_name()
        last_name = fake.last_name()
        email = fake.email()
        phone = fake.numerify('(###)###-####') # Define a pattern for a 10-digit phone number
        customers.append((first_name, last_name, email, phone))
    
    # Insert data
    query = """
    INSERT INTO customers.customers (first_name, last_name, email, phone)
    VALUES (%s, %s, %s, %s)
    """
    cursor.executemany(query, customers)

    # Confirmation message
    print("customers.customers populated successfully.")

**1.2 inventory.products**

cols: name, description, price, stock_quantity

In [None]:
def generate_products(cursor, num_products=50):

    print("Generating inventory.products data...")

    # Generate data
    fake = Faker()
    products = []

    for _ in range(num_products):
        name = fake.word().capitalize()  # Product name
        description = fake.sentence(nb_words=10)  # Short description
        price = round(random.uniform(10, 1000), 2)  # Price between 10 and 1000
        stock_quantity = fake.random_int(min=1, max=500)  # qty between 1 and 500
        products.append((name, description, price, stock_quantity))
    
    # Insert data
    query = """
    INSERT INTO inventory.products (name, description, price, stock_quantity)
    VALUES (%s, %s, %s, %s)
    """
    cursor.executemany(query, products)
    
    # Confirmation message
    print('inventory.products populated successfully.')


 **1.3 inventory.suppliers**

cols: supplier_id, name, contact_info

In [None]:
def generate_suppliers(cursor, num_suppliers=10):

    print("Generating inventory.suppliers data...")
    
    # Generate data
    fake = Faker()
    suppliers = []
    
    for _ in range(1, num_suppliers + 1):
        name = fake.company()
        contact_info = fake.phone_number()
        suppliers.append((name, contact_info))
    
    # Insert data
    query = """
        INSERT INTO inventory.suppliers(name, contact_info)
        VALUES (%s, %s)
        """
    cursor.executemany(query, suppliers)

    # Confirmation message
    print(f'inventory.suppliers populated with {num_suppliers} suppliers successfully.')  

**1.4 mapping.states**

cols: state_name, state_abbr

In [None]:
def generate_mapping_US_states(cursor):

    print("Generating mapping.states data...")
    
    # Generate data
    us_states = [
            ('Alabama', 'AL'), ('Alaska', 'AK'), ('Arizona', 'AZ'), ('Arkansas', 'AR'),
            ('California', 'CA'), ('Colorado', 'CO'), ('Connecticut', 'CT'), ('Delaware', 'DE'),
            ('Florida', 'FL'), ('Georgia', 'GA'), ('Hawaii', 'HI'), ('Idaho', 'ID'),
            ('Illinois', 'IL'), ('Indiana', 'IN'), ('Iowa', 'IA'), ('Kansas', 'KS'),
            ('Kentucky', 'KY'), ('Louisiana', 'LA'), ('Maine', 'ME'), ('Maryland', 'MD'),
            ('Massachusetts', 'MA'), ('Michigan', 'MI'), ('Minnesota', 'MN'), ('Mississippi', 'MS'),
            ('Missouri', 'MO'), ('Montana', 'MT'), ('Nebraska', 'NE'), ('Nevada', 'NV'),
            ('New Hampshire', 'NH'), ('New Jersey', 'NJ'), ('New Mexico', 'NM'), ('New York', 'NY'),
            ('North Carolina', 'NC'), ('North Dakota', 'ND'), ('Ohio', 'OH'), ('Oklahoma', 'OK'),
            ('Oregon', 'OR'), ('Pennsylvania', 'PA'), ('Rhode Island', 'RI'), ('South Carolina', 'SC'),
            ('South Dakota', 'SD'), ('Tennessee', 'TN'), ('Texas', 'TX'), ('Utah', 'UT'),
            ('Vermont', 'VT'), ('Virginia', 'VA'), ('Washington', 'WA'), ('West Virginia', 'WV'),
            ('Wisconsin', 'WI'), ('Wyoming', 'WY'), ('District of Columbia', 'DC')
        ]
    
    # Insert data
    query = """
    INSERT INTO mapping.states(state_name, state_abbr)
    VALUES (%s, %s)
    """
    cursor.executemany(query, us_states)

    # Confirmation message
    print('mapping.states populated successfully.')

**1.5 employees.departments**

cols: name, manager_id

In [None]:
def generate_departments(cursor):

    print("Generating employees.departments data...")

    # Generate data
    departments_data = [
        ('HR', None), ('IT', None), ('Sales', None), ('Marketing', None), ('Finance', None)
    ]

    # Insert data
    query = "INSERT INTO employees.departments(name, manager_id) VALUES (%s, %s)"
    cursor.executemany(query, departments_data)
    
    # Confirmation message
    print(f"employees.departments populated with {len(departments_data)} records successfully.")

**1.6 mapping.payment_methods**

In [None]:
def generate_mapping_payment_methods(cursor):

    print("Generating mapping.payment_methods data...")

    # Generate data
    methods = ['Cash', 'Credit Card', 'Debit Card', 'Check']
    payment_methods_data = [(m,) for m in methods]

    # Insert data
    query = """
    INSERT INTO mapping.payment_methods(method_name) 
    VALUES (%s)
    """
    cursor.executemany(query, payment_methods_data)

    # Confirmation message
    print(f"mapping.payment_methods populated with {len(payment_methods_data)} records successfully.")

## 2. Secondary Tables

**2.1 mapping.cities**

cols: city_state_id, city_name, state_id

In [None]:
def generate_mapping_cities(cursor, num_cities=100):

    print("Generating mapping.cities data...")
        
    # Generate data
    fake = Faker()
    cities_data = []

    # state_ids
    query_state_ids = """
    SELECT state_id FROM mapping.states;
    """
    cursor.execute(query_state_ids)
    state_ids = [r[0] for r in cursor.fetchall()]
    if not state_ids:
        raise ValueError("mapping.states is empty")

    for _ in range(1, num_cities+1):
        city_name = fake.city()
        state_id = random.choice(state_ids)
        cities_data.append((city_name, state_id))

    # Insert data
    query = """
    INSERT INTO mapping.cities (city_name, state_id)
    VALUES (%s, %s)
    """
    cursor.executemany(query, cities_data)

    # Confirmation message
    print(f'mapping.cities populated with {num_cities} records successfully.')

**2.2 customers.customer_addresses**

cols: address_id, customer_id, street, city_state_id, zip_code, address_type

In [None]:
def generate_customer_addresses(cursor):

    print("Generating customers.customer_addresses data...")
        
    # Generate data
    fake = Faker()
    customer_addresses_data = []
    addy_st_types = ['Street', 'Avenue', 'Boulevard', 'Court', 'Drive', 'Lane', 'Place', 'Road', 'Terrace']

    # city_state_ids
    query_city_state_ids = """
    SELECT city_state_id FROM mapping.cities;
    """
    cursor.execute(query_city_state_ids)
    city_state_ids = [r[0] for r in cursor.fetchall()]
    if not city_state_ids:
        raise ValueError("mapping.cities is empty")

    # customer_ids
    query_customer_ids = """
    SELECT customer_id FROM customers.customers;
    """
    cursor.execute(query_customer_ids)
    customer_ids = [r[0] for r in cursor.fetchall()]
    if not customer_ids:
        raise ValueError("customers.customers is empty")

    for customer_id in customer_ids:
        customer_id = customer_id
        addy_st_number = fake.building_number()
        addy_st_name = fake.street_name()
        addy_st_type = random.choice(addy_st_types)
        street = f"{addy_st_number} {addy_st_name} {addy_st_type}"
        city_state_id = random.choice(city_state_ids)
        zip_code = fake.zipcode()

        # 1. Add billing address
        customer_addresses_data.append((customer_id, street, city_state_id, zip_code, 'billing'))

        # 2. Add shipping address: Randomly decide if shipping address is the same or different than billing address
        if random.choice([True, False]):
            customer_addresses_data.append((customer_id, street, city_state_id, zip_code, 'shipping'))
        else:
            # Generate different shipping address
            addy_st_number = fake.building_number()
            addy_st_name = fake.street_name()
            addy_st_type = random.choice(addy_st_types)
            street = f"{addy_st_number} {addy_st_name} {addy_st_type}"
            city_state_id = random.choice(city_state_ids)
            zip_code = fake.zipcode()
            customer_addresses_data.append((customer_id, street, city_state_id, zip_code, 'shipping'))

    # Insert data
    query = f"""
    INSERT INTO customers.customer_addresses(customer_id, street, city_state_id, zip_code, address_type)
    VALUES (%s, %s, %s, %s, %s)
    """
    cursor.executemany(query, customer_addresses_data)

    # Confirmation message
    print(f'customers.customer_addresses populated with {len(customer_addresses_data)} records successfully.')

**2.3 inventory.shipments**

cols: shipment_id, supplier_id, shipment_date, status (shipped, delivered, pending)

In [77]:
def generate_inventory_shipments(cursor, num_shipments=50):

    print("Generating inventory.shipments data...")
    
    # Generate data
    fake = Faker()
    shipments = []

    # supplier_ids
    query_supplier_ids = """
    SELECT supplier_id FROM inventory.suppliers
    """
    cursor.execute(query_supplier_ids)
    supplier_ids = [r[0] for r in cursor.fetchall()]
    if not supplier_ids:
        raise ValueError("inventory.suppliers is empty")
    
    # status_options
    status_options = ['Shipped', 'Delivered', 'Pending']

    for _ in range(1, num_shipments + 1):
        supplier_id = random.choice(supplier_ids)
        shipment_date = fake.date_between(start_date='-1y', end_date='today')
        status = random.choice(status_options)
        shipments.append((supplier_id, shipment_date, status))
    
    # Insert data
    query = """
    INSERT INTO inventory.shipments(supplier_id, shipment_date, status)
    VALUES (%s, %s, %s)
    """
    cursor.executemany(query, shipments)

    # Confirmation message
    print(f'inventory.shipments populated with {num_shipments} shipments successfully.')

**2.4 employees.employees**

In [None]:
def generate_employees(cursor, num_employees=50):

    print("Generating employees.employees data...")
    
    # Generate data
    fake = Faker()
    employees_data = []
    
    for _ in range(num_employees):
        first_name = fake.first_name()
        last_name = fake.last_name()
        email = f"{first_name.lower()}.{last_name.lower()}@dummycompany.com"
        start_date = fake.date_between(start_date='-5y', end_date='today')
        dept_id = random.randint(1, 5)
        salary = random.randint(40000, 120000)
        employees_data.append((first_name, last_name, email, start_date, None, dept_id, salary))
        
    # Insert data
    query = """
    INSERT INTO employees.employees(first_name, last_name, email, start_date, end_date, department_id, salary)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    cursor.executemany(query, employees_data)

    # Insertion confirmation message
    print(f'{num_employees} employees inserted successfully.')

    # Assign employees.departments.managers_id
    print('Assigning department managers based on highest salary...')
    
    update_manager_id_query = """
    WITH ranked AS (
        SELECT employee_id, department_id,
        ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC, employee_id ASC) AS rn
        FROM employees.employees
    )
    UPDATE employees.departments d
    SET manager_id = r.employee_id
    FROM ranked r
    WHERE d.department_id = r.department_id AND r.rn = 1;
    """
    cursor.execute(update_manager_id_query)
    
    # Update confirmation message
    print('employees.departments.manager_id updated successfully.')

**2.5 sales.orders**

cols: order_id, customer_id, order_date, total_amount

In [None]:
def generate_sales_orders(cursor, n=500):
    """
    Insert n orders with total_amount placeholder = 0.00.
    """

    print('Generating sales.orders data...')
    
    # Generate data
    fake = Faker()
    orders_data = []

    # customer_ids
    customer_ids_query = """
    SELECT customer_id FROM customers.customers;
    """
    cursor.execute(customer_ids_query)
    customer_ids = [r[0] for r in cursor.fetchall()]
    if not customer_ids:
        raise ValueError("customers.customers is empty")

    for _ in range(n):
        customer_id = random.choice(customer_ids)
        order_date = fake.date_between(start_date='-1y', end_date='today')
        total_amount_placeholder = 0.00
        orders_data.append((customer_id, order_date, total_amount_placeholder))

    # Insert data
    query = """
    INSERT INTO sales.orders (customer_id, order_date, total_amount)
    VALUES (%s, %s, %s)
    """
    cursor.executemany(query, orders_data)

    # Confirmation message
    print(f'sales.orders populated by {n} records successfully.')


**2.6 sales.order_details**

cols: order_detail_id- pkey, order_id- fkey from sales.orders, product_id- fkey from inventory.products, quantity, item_price, line_item_total

In [None]:
def generate_sales_order_details(cursor):
    """
    Insert order_details and update sales.orders.total_amount using sum(line_item_total).
    """

    print('Generating sales.order_details data...')
    
    # Generate data
    sales_order_details_data = []

    # order_ids
    order_ids_query = """
    SELECT order_id FROM sales.orders;
    """
    cursor.execute(order_ids_query)
    order_ids = [r[0] for r in cursor.fetchall()]
    if not order_ids:
        raise ValueError("sales.orders is empty")

    # product_ids, prices
    prod_ids_prices_query = """
    SELECT product_id, price FROM inventory.products;
    """
    cursor.execute(prod_ids_prices_query)
    rows = cursor.fetchall()
    if not rows:
        raise ValueError("No products found in inventory.products")
    prod_id_price_dict = {pid: float(price) for pid, price in rows}
    product_ids = list(prod_id_price_dict.keys())

    for order_id in order_ids:
        num_items = random.randint(1, 5)  # Each order has between 1 and 5 items
        chosen_products = random.sample(product_ids, num_items)

        for product_id in chosen_products:
            qty = random.randint(1, 10)  # Each item has a quantity between 1 and 10
            item_price = prod_id_price_dict[product_id]
            line_item_total = round(item_price * qty, 2)
            sales_order_details_data.append((order_id, product_id, qty, item_price, line_item_total))

    # Insert data
    query = """
    INSERT INTO sales.order_details (order_id, product_id, quantity, item_price, line_item_total)
    VALUES (%s, %s, %s, %s, %s)
    """
    cursor.executemany(query, sales_order_details_data)

    # Insertion confirmation message
    print(f'sales.order_details populated by {len(sales_order_details_data)} records successfully.')

    # update total_amount in sales.orders
    print("Updating sales.orders.total_amount from sales.order_details.line_item_total sums...")
    
    update_totals_query = """
        UPDATE sales.orders o
        SET total_amount = t.order_total
        FROM (
            SELECT order_id, ROUND(SUM(line_item_total)::numeric, 2) AS order_total
            FROM sales.order_details
            WHERE order_id = ANY(%s)
            GROUP BY order_id
        ) t
        WHERE o.order_id = t.order_id;
    """
    cursor.execute(update_totals_query, (order_ids,))

    # Update confirmation message
    print("sales.orders.total_amount updated successfully.")

**2.7 sales.payments**

In [None]:
def generate_payments(cursor):

    print("Generating sales payments data...")

    # Generate data
    # order_id, total_amount, order_date from sales.orders
    cursor.execute("SELECT order_id, total_amount, order_date FROM sales.orders;")
    orders = cursor.fetchall()  # List of (order_id, total_amount, order_date)
    if not orders:
        raise ValueError("sales.orders is empty")

    # payment_method_ids from mapping.payment_methods
    cursor.execute("SELECT payment_method_id FROM mapping.payment_methods;")
    payment_methods = [row[0] for row in cursor.fetchall()]  # List of payment_method_ids
    if not payment_methods:
        raise ValueError("mapping.payment_methods is empty")

    sales_payments_data = [
        (order_id, random.choice(payment_methods), total_amount, order_date)
        for order_id, total_amount, order_date in orders
    ]

    # Insert data
    insert_query = """
    INSERT INTO sales.payments (order_id, payment_method_id, amount, payment_date)
    VALUES (%s, %s, %s, %s);
    """
    cursor.executemany(insert_query, sales_payments_data)

    # Confirmation message
    print("sales.payments populated successfully.")


## 3. Database Utilities

**3.1 Connect to Database**

In [78]:
def get_connection():

    print("CONNECTING...\n Using following settings to connect with database:")

    # load env variables
    dotenv_path = '../.env'
    load_dotenv(dotenv_path)

    # confirm env variables are loaded
    DB_NAME = os.getenv("DB_NAME")
    DB_USER = os.getenv("DB_USER")
    DB_PASSWORD = os.getenv("DB_PASSWORD")
    DB_HOST = os.getenv("DB_HOST")
    DB_PORT = os.getenv("DB_PORT")

    print(
        f"DB_NAME: {DB_NAME}", 
        f"DB_USER: {DB_USER}", 
        f"DB_PASSWORD: {DB_PASSWORD}", 
        f"DB_HOST: {DB_HOST}", 
        f"DB_PORT: {DB_PORT}", 
        sep="\n"
        )
    
    # connect to database
    try:
        connection = psycopg2.connect(
            dbname = os.environ.get('DB_NAME'),
            user = os.environ.get('DB_USER'),
            password = os.environ.get('DB_PASSWORD'),
            host = os.environ.get('DB_HOST'),
            port = os.environ.get('DB_PORT')
        )
        print("Connection successful!")
        return connection
    
    except OperationalError as e:
        print(f"Error connecting to database: {e}")
        return None


**3.2 Truncate User Tables**

In [17]:
def clear_db_schema(cursor):
    """Dynamically truncates all user-defined tables."""
    print("\n--- MAINTENANCE ---\nClearing all user tables")
    
    dynamic_truncate = """
    SELECT 'TRUNCATE TABLE ' || 
           string_agg(quote_ident(schemaname) || '.' || quote_ident(tablename), ', ') || 
           ' RESTART IDENTITY CASCADE;'
    FROM pg_catalog.pg_tables
    WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
      AND schemaname NOT LIKE 'pg_toast%'
      AND schemaname NOT LIKE 'pg_temp%';
    """
    
    cursor.execute(dynamic_truncate)
    row = cursor.fetchone()
    
    if row and row[0]:
        cursor.execute(row[0])
        print("Success: Database is now a clean slate.")
    else:
        print("Notice: No tables found to clear.")

## Populate Database

In [79]:
def run_data_generation():
    connection = get_connection()
    if not connection:
        return

    try:
        with connection:
            with connection.cursor() as cursor:

                # 1. Clear existing data
                clear_db_schema(cursor)

                # 2. Insert data
                print("\n--- INSERTING data into tables ---")

                # primary tables
                generate_customers(cursor)
                generate_products(cursor)
                generate_suppliers(cursor)
                generate_mapping_US_states(cursor)
                generate_departments(cursor)
                generate_payment_methods(cursor)
                
                # secondary tables
                generate_mapping_cities(cursor)
                generate_customer_addresses(cursor)
                generate_inventory_shipments(cursor)
                generate_employees(cursor)
                generate_sales_orders(cursor)
                generate_sales_order_details(cursor)
                generate_payments(cursor)
                
        print("\n--- SUCCESS! ---\nData generated and committed.")
        
    except Exception as e:
        print("\n--- ERROR ---")
        print(f"CRITICAL ERROR: {e}")
        print("--- UNROLLING changes or commits ---")
    finally:
        connection.close()
        print("\nCONNECTION CLOSED SAFELY")

# Execute

In [80]:
# Execute
run_data_generation()

CONNECTING...
 Using following settings to connect with database:
DB_NAME: dummy_company_abc_test
DB_USER: postgres
DB_PASSWORD: adminonly_5432
DB_HOST: localhost
DB_PORT: 5432
Connection successful!

--- MAINTENANCE ---
Clearing all user tables
Success: Database is now a clean slate.

--- INSERTING data into tables ---
Generating customers.customers data...
customers.customers populated successfully.
Generating inventory.products data...
inventory.products populated successfully.
Generating inventory.suppliers data...
inventory.suppliers populated with 10 suppliers successfully.
Generating mapping.states data...
mapping.states populated successfully.
Generating employees.departments data...
employees.departments populated with 5 records successfully.
Generating payment methods data...
mapping.payment_methods populated with 4 records successfully.
Generating mapping.cities data...
mapping.cities populated with 100 records successfully.
Generating customers.customer_addresses data...
cu