# ETL Process for Retail Data Warehouse

This notebook extracts raw retail sales data, transforms it into a star-schema format, 
and loads it into an SQLite database for OLAP analysis.

# 1. Imports

In [1]:
import os, sqlite3, random
import numpy as np
import pandas as pd
from IPython.display import display


# 2. Paths

In [2]:
ETL_DIR = os.getcwd()
CSV_PATH = os.path.join(ETL_DIR, "online_retail.csv")  # will be created here if missing
DB_PATH  = os.path.join(ETL_DIR, "retail_dw.db")

CURRENT_DATE = pd.Timestamp("2025-08-13")
ONE_YEAR_AGO = CURRENT_DATE - pd.Timedelta(days=365)

print("Notebook folder:", ETL_DIR)
print("CSV path       :", CSV_PATH)
print("SQLite DB path :", DB_PATH)

Notebook folder: c:\Users\billg\Documents\Kyra ;)\DSA-2040_Practical_Exam_Kyra_619\DSA-2040_Practical_Exam_Kyra_619\data_warehouse\etl
CSV path       : c:\Users\billg\Documents\Kyra ;)\DSA-2040_Practical_Exam_Kyra_619\DSA-2040_Practical_Exam_Kyra_619\data_warehouse\etl\online_retail.csv
SQLite DB path : c:\Users\billg\Documents\Kyra ;)\DSA-2040_Practical_Exam_Kyra_619\DSA-2040_Practical_Exam_Kyra_619\data_warehouse\etl\retail_dw.db


# 3. Create retail CSV

In [3]:
def create_retail_csv(path=CSV_PATH, n_rows=1000, seed=42):
    random.seed(seed); np.random.seed(seed)
    countries  = ["United Kingdom","Germany","France","Spain","Netherlands","Norway","Sweden","Finland","Denmark","Ireland"]
    categories = ["Electronics","Clothing","Home","Toys","Beauty"]

    n_customers = 100
    customer_ids = [10000 + i for i in range(n_customers)]
    country_of   = {cid: random.choice(countries) for cid in customer_ids}

    n_products = 150
    stock_codes = [f"S{1000+i}" for i in range(n_products)]
    unit_price  = {code: round(random.uniform(1.0, 100.0), 2) for code in stock_codes}
    cat_of      = {code: random.choice(categories) for code in stock_codes}

    end_date = CURRENT_DATE
    start_date = end_date - pd.Timedelta(days=730)

    rows = []
    for i in range(n_rows):
        invoice = f"INV{100000+i}"
        code = random.choice(stock_codes)
        desc = f"{cat_of[code]} - Item {random.randint(1,999)}"
        qty  = random.randint(1, 50)
        offset_days = np.random.randint(0, (end_date - start_date).days + 1)
        dt = start_date + pd.Timedelta(days=int(offset_days), seconds=int(np.random.randint(0, 86400)))
        cid = random.choice(customer_ids)
        rows.append([invoice, code, desc, qty, dt, unit_price[code], cid, country_of[cid]])

    df = pd.DataFrame(rows, columns=[
        "InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country"
    ])
    df.to_csv(path, index=False)
    return path

if not os.path.exists(CSV_PATH):
    create_retail_csv(CSV_PATH, n_rows=1000)
else:
    print("CSV already exists; not overwriting.")

display(pd.read_csv(CSV_PATH, nrows=5))


CSV already exists; not overwriting.


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,INV100000,S1045,Clothing - Item 595,17,2023-11-24 04:23:15,1.06,10004,Spain
1,INV100001,S1027,Clothing - Item 611,28,2024-05-10 21:20:20,12.04,10044,Finland
2,INV100002,S1080,Home - Item 447,39,2025-04-19 01:44:25,22.8,10065,Norway
3,INV100003,S1029,Toys - Item 395,37,2024-11-22 10:19:54,45.92,10024,Finland
4,INV100004,S1065,Home - Item 46,46,2024-11-14 12:15:31,42.89,10055,Germany


# 4. ETL pipeline

In [None]:
def run_etl(csv_path=CSV_PATH, db_path=DB_PATH):
    DDL = """
    CREATE TABLE IF NOT EXISTS CustomerDim (
        CustomerKey INTEGER PRIMARY KEY,
        CustomerID INTEGER,
        Country TEXT,
        total_purchases REAL,
        total_transactions INTEGER
    );
    CREATE TABLE IF NOT EXISTS ProductDim (
        ProductKey INTEGER PRIMARY KEY,
        StockCode TEXT,
        Description TEXT,
        Category TEXT,
        UnitPrice REAL
    );
    CREATE TABLE IF NOT EXISTS TimeDim (
        TimeKey INTEGER PRIMARY KEY,
        Date TEXT,
        Day INTEGER,
        Month INTEGER,
        MonthName TEXT,
        Quarter INTEGER,
        Year INTEGER
    );
    CREATE TABLE IF NOT EXISTS SalesFact (
        SalesFactID INTEGER PRIMARY KEY,
        InvoiceNo TEXT,
        CustomerKey INTEGER,
        ProductKey INTEGER,
        TimeKey INTEGER,
        Quantity INTEGER,
        UnitPrice REAL,
        TotalSales REAL,
        FOREIGN KEY (CustomerKey) REFERENCES CustomerDim(CustomerKey),
        FOREIGN KEY (ProductKey) REFERENCES ProductDim(ProductKey),
        FOREIGN KEY (TimeKey) REFERENCES TimeDim(TimeKey)
    );
    """

    # --- Extract
    df = pd.read_csv(csv_path, parse_dates=["InvoiceDate"])
    print("[ETL] Extracted rows:", len(df))

    # --- Transform
    df = df[(df["Quantity"] >= 0) & (df["UnitPrice"] > 0)].copy()
    df["TotalSales"] = df["Quantity"] * df["UnitPrice"]
    df = df[(df["InvoiceDate"] >= ONE_YEAR_AGO) & (df["InvoiceDate"] <= CURRENT_DATE)].copy()
    print("[ETL] After last-year filter:", len(df))

    # Dimensions
    cust = (df.groupby(["CustomerID","Country"], as_index=False)
              .agg(total_purchases=("TotalSales","sum"),
                   total_transactions=("InvoiceNo","nunique")))
    cust["CustomerKey"] = range(1, len(cust)+1)
    customer_dim = cust[["CustomerKey","CustomerID","Country","total_purchases","total_transactions"]]

    prod = df[["StockCode","Description","UnitPrice"]].drop_duplicates().copy()
    prod["Category"] = prod["Description"].str.split(" - ").str[0].fillna("Other")
    prod["ProductKey"] = range(1, len(prod)+1)
    product_dim = prod[["ProductKey","StockCode","Description","Category","UnitPrice"]]

    tdf = pd.DataFrame({"Date": pd.to_datetime(df["InvoiceDate"].dt.date.unique())})
    tdf["TimeKey"] = range(1, len(tdf)+1)
    tdf["Day"] = tdf["Date"].dt.day
    tdf["Month"] = tdf["Date"].dt.month
    tdf["MonthName"] = tdf["Date"].dt.strftime("%b")
    tdf["Quarter"] = ((tdf["Month"] - 1)//3 + 1)
    tdf["Year"] = tdf["Date"].dt.year
    time_dim = tdf[["TimeKey","Date","Day","Month","MonthName","Quarter","Year"]]

    # --- Map surrogate keys to fact rows
    cust_map = {(r.CustomerID, r.Country): r.CustomerKey for r in cust.itertuples(index=False)}
    prod_map = {r.StockCode: r.ProductKey for r in product_dim.itertuples(index=False)}
    time_map = {r.Date: r.TimeKey for r in time_dim.itertuples(index=False)}

    sf = df.copy()
    sf["DateOnly"]     = sf["InvoiceDate"].dt.date
    sf["CustomerKey"]  = [cust_map[(cid, ctry)] for cid, ctry in zip(sf["CustomerID"], sf["Country"])]
    sf["ProductKey"]   = [prod_map[sc] for sc in sf["StockCode"]]
    sf["TimeKey"]      = [time_map[pd.to_datetime(d)] for d in sf["DateOnly"]]

    sales_fact = sf[["InvoiceNo","CustomerKey","ProductKey","TimeKey","Quantity","UnitPrice","TotalSales"]].reset_index(drop=True)
    sales_fact["SalesFactID"] = range(1, len(sales_fact)+1)
    sales_fact = sales_fact[["SalesFactID","InvoiceNo","CustomerKey","ProductKey","TimeKey","Quantity","UnitPrice","TotalSales"]]

    # --- Load (context manager ensures the file is released on Windows)
    with sqlite3.connect(db_path) as con:
        con.executescript(DDL)
        customer_dim.to_sql("CustomerDim", con, if_exists="replace", index=False)
        product_dim.to_sql("ProductDim",  con, if_exists="replace", index=False)
        time_dim.to_sql("TimeDim",        con, if_exists="replace", index=False)
        sales_fact.to_sql("SalesFact",    con, if_exists="replace", index=False)
        con.commit()

        # Row counts (sanity)
        for tbl in ["CustomerDim","ProductDim","TimeDim","SalesFact"]:
            cnt = pd.read_sql_query(f"SELECT COUNT(*) AS c FROM {tbl};", con)["c"][0]
            print(f"[ETL] {tbl} rows:", cnt)


# 5. Run the ETL

In [None]:
run_etl()
with sqlite3.connect(DB_PATH) as con:
    display(pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", con))


[ETL] Extracted rows: 1000
[ETL] After last-year filter: 509
[ETL] CustomerDim rows: 100
[ETL] ProductDim rows: 509
[ETL] TimeDim rows: 270
[ETL] SalesFact rows: 509


Unnamed: 0,name
0,CustomerDim
1,ProductDim
2,SalesFact
3,TimeDim


# 6. Validate the load

In [6]:
with sqlite3.connect(DB_PATH) as con:
    print("Row counts:")
    display(pd.read_sql_query("""
    SELECT 'CustomerDim' AS table_name, COUNT(*) AS cnt FROM CustomerDim
    UNION ALL SELECT 'ProductDim', COUNT(*) FROM ProductDim
    UNION ALL SELECT 'TimeDim', COUNT(*) FROM TimeDim
    UNION ALL SELECT 'SalesFact', COUNT(*) FROM SalesFact;
    """, con))

    print("Integrity (expect 0 bad rows):")
    display(pd.read_sql_query("""
    SELECT COUNT(*) AS bad_rows
    FROM SalesFact
    WHERE ROUND(TotalSales, 2) <> ROUND(Quantity * UnitPrice, 2);
    """, con))


Row counts:


Unnamed: 0,table_name,cnt
0,CustomerDim,100
1,ProductDim,509
2,TimeDim,270
3,SalesFact,509


Integrity (expect 0 bad rows):


Unnamed: 0,bad_rows
0,0
