# ETL: Online Retail → retail_dw.db (Section 1 Task 2)

This notebook runs the ETL pipeline:
- Extract: read data/data file
- Transform: cleaning, TotalSales, categories, time dim
- Load: write CustomerDim, ProductDim, TimeDim, SalesFact into outputs/db/retail_dw.db

All outputs are saved under the `outputs/` folder.

## Setup and paths

Import libraries and set up file paths and output directories. This cell defines where the input data should be and where outputs will be written.

In [2]:
import pandas as pd
import sqlite3
import logging
from pathlib import Path
from datetime import datetime

PROJECT_ROOT = Path.cwd().parent
DATA_PATH = PROJECT_ROOT / 'data' / 'Online_Retail.xlsx'
DB_PATH = PROJECT_ROOT / 'outputs' / 'db' / 'retail_dw.db'
IMAGES_DIR = PROJECT_ROOT / 'outputs' / 'images'
REPORTS_DIR = PROJECT_ROOT / 'outputs' / 'reports'
(DB_PATH.parent).mkdir(parents=True, exist_ok=True)
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
print('DATA_PATH =', DATA_PATH)
print('DB_PATH =', DB_PATH)


DATA_PATH = c:\Users\HP\Documents\DSA2040_Practical_Exam_Geoffrey_Mwangi_566\data\Online_Retail.xlsx
DB_PATH = c:\Users\HP\Documents\DSA2040_Practical_Exam_Geoffrey_Mwangi_566\outputs\db\retail_dw.db


## Extract: read the source file

Define a function to read the Excel file into a pandas DataFrame.

In [3]:
def read_retail(path=DATA_PATH):
    if not path.exists():
        raise FileNotFoundError(f"Data file not found: {path}")
    if path.suffix in ('.xls','.xlsx'):
        df = pd.read_excel(path, engine='openpyxl')
    else:
        df = pd.read_csv(path)
    logging.info(f"Read {len(df)} rows from {path.name}")
    return df


## Transform: cleaning and feature engineering

This function:
- parses `InvoiceDate` to datetime,
- drops rows missing `InvoiceDate` or `CustomerID`,
- converts `Quantity` and `UnitPrice` to numeric and removes invalid values,
- computes `TotalSales = Quantity * UnitPrice`,
- maps `Description` to a simple `Category`,
- computes `invoice_date` and `is_weekend`,
- filters to the last-year window (2024-08-12 → 2025-08-12) and falls back to the full cleaned dataset if empty.

In [4]:
def clean_transform(df):
    df = df.copy()
    # parse date
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
    
    # drop missing InvoiceDate or CustomerID
    df = df.dropna(subset=['InvoiceDate','CustomerID'])
    
    # numeric conversions
    df['Quantity'] = pd.to_numeric(df['Quantity'], errors='coerce').fillna(0).astype(int)
    df['UnitPrice'] = pd.to_numeric(df['UnitPrice'], errors='coerce').fillna(0.0)
    before = len(df)
    df = df[(df['Quantity'] >= 0) & (df['UnitPrice'] > 0)]
    logging.info(f'Removed {before - len(df)} rows with invalid Quantity/UnitPrice')
    
    # compute TotalSales
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']
    
    # category mapping
    def categorize(desc):
        if not isinstance(desc, str):
            return 'Other'
        d = desc.lower()
        if any(k in d for k in ['usb','battery','camera','phone','charger','electronic','speaker']):
            return 'Electronics'
        if any(k in d for k in ['t-shirt','shirt','dress','jacket','coat','sweater']):
            return 'Clothing'
        if 'book' in d:
            return 'Books'
        if any(k in d for k in ['cup','mug','bottle','glass']):
            return 'Home'
        return 'Other'
    df['Category'] = df['Description'].apply(categorize)
    
    # invoice_date and weekend flag
    df['invoice_date'] = df['InvoiceDate'].dt.date
    df['is_weekend'] = df['InvoiceDate'].dt.dayofweek.isin([5,6]).astype(int)
    
    # filter last year: 2024-08-12 -> 2025-08-12
    cutoff_end = pd.to_datetime('2025-08-12')
    cutoff_start = cutoff_end - pd.DateOffset(years=1)
    df_last_year = df[(df['InvoiceDate'] >= cutoff_start) & (df['InvoiceDate'] <= cutoff_end)].copy()
    if df_last_year.empty:
        logging.warning('No rows in last-year window (2024-08-12 to 2025-08-12). Falling back to full cleaned dataset for ETL demonstration.')
        df_last_year = df.copy()
    else:
        logging.info(f'Filtered to last year: {len(df_last_year)} rows between {cutoff_start.date()} and {cutoff_end.date()}')
    return df_last_year


## Build dimensions and SalesFact

This function builds:
- CustomerDim (aggregations per customer),
- ProductDim (unique products with product_id),
- TimeDim (per invoice date),
- SalesFact (fact rows referencing dims).

In [5]:
def build_dims_and_fact(df):
    # CustomerDim
    cust = df.groupby('CustomerID').agg(
        total_purchases = ('TotalSales','sum'),
        num_invoices = ('InvoiceNo', lambda x: x.nunique()),
        country = ('Country', lambda x: x.mode().iloc[0] if len(x)>0 else None)
    ).reset_index().rename(columns={'CustomerID':'customer_id'})
    cust['customer_id'] = cust['customer_id'].astype(int)
    cust = cust[['customer_id','country','total_purchases','num_invoices']]

    # ProductDim
    prod = df[['StockCode','Description','Category']].drop_duplicates().rename(columns={'StockCode':'stock_code','Description':'description','Category':'category'}).reset_index(drop=True)
    prod['product_id'] = prod.index + 1
    prod = prod[['product_id','stock_code','description','category']]

    # TimeDim
    time_df = pd.DataFrame({'invoice_date': sorted(df['invoice_date'].unique())})
    time_df['year'] = pd.to_datetime(time_df['invoice_date']).dt.year
    time_df['month'] = pd.to_datetime(time_df['invoice_date']).dt.month
    time_df['quarter'] = pd.to_datetime(time_df['invoice_date']).dt.quarter
    time_df['time_id'] = time_df.index + 1
    time_df = time_df[['time_id','invoice_date','year','month','quarter']]

    # SalesFact
    prod_map = {r['stock_code']: r['product_id'] for _,r in prod.iterrows()}
    time_map = {r['invoice_date']: r['time_id'] for _,r in time_df.iterrows()}
    fact = df.copy()
    fact['product_id'] = fact['StockCode'].map(prod_map)
    fact['customer_id'] = fact['CustomerID'].astype(int)
    fact['time_id'] = fact['invoice_date'].map(time_map)
    fact = fact[['InvoiceNo','product_id','customer_id','time_id','Quantity','UnitPrice','TotalSales']].rename(columns={'InvoiceNo':'invoice_no','Quantity':'quantity','UnitPrice':'unit_price','TotalSales':'total_sales'})
    fact = fact.dropna(subset=['product_id','customer_id','time_id'])
    return cust, prod, time_df, fact


## Load: write DataFrames to SQLite

Write the dims and fact tables into the SQLite DB.

In [6]:
def load_to_sqlite(cust, prod, time_df, fact, db_path=DB_PATH):
    conn = sqlite3.connect(db_path)
    cust.to_sql('CustomerDim', conn, if_exists='replace', index=False)
    prod.to_sql('ProductDim', conn, if_exists='replace', index=False)
    time_df.to_sql('TimeDim', conn, if_exists='replace', index=False)
    fact.to_sql('SalesFact', conn, if_exists='replace', index=False)
    conn.commit()
    # log counts
    for t in ['CustomerDim','ProductDim','TimeDim','SalesFact']:
        c = pd.read_sql_query(f'SELECT COUNT(*) as cnt FROM {t}', conn)
        logging.info(f"{t} rows: {c['cnt'].iat[0]}")
    conn.close()


## Run the full ETL pipeline

This final cell contains the commands to run the ETL end-to-end.

In [None]:
df_raw = read_retail()
df_clean = clean_transform(df_raw)
cust, prod, time_df, fact = build_dims_and_fact(df_clean)
load_to_sqlite(cust, prod, time_df, fact)
print('ETL complete. DB at', DB_PATH)

2025-08-14 22:13:51,863 - INFO - Read 541909 rows from Online_Retail.xlsx
2025-08-14 22:13:52,113 - INFO - Removed 8945 rows with invalid Quantity/UnitPrice
2025-08-14 22:13:56,180 - INFO - CustomerDim rows: 4338
2025-08-14 22:13:56,180 - INFO - ProductDim rows: 3897
2025-08-14 22:13:56,180 - INFO - TimeDim rows: 305
2025-08-14 22:13:56,199 - INFO - SalesFact rows: 397884


ETL complete. DB at c:\Users\HP\Documents\DSA2040_Practical_Exam_Geoffrey_Mwangi_566\outputs\db\retail_dw.db
Ready: uncomment the ETL run cell to execute the pipeline.
