In [1]:
import pandas as pd
import sqlite3
import random
import logging
from datetime import datetime, timedelta

# ---------------- Logging setup ----------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ---------------- Synthetic data generation ----------------
def generate_synthetic_data(n_rows=1000):
    random.seed(42)
    
    invoice_nos = [f"INV{1000+i}" for i in range(n_rows)]
    stock_codes = [f"P{random.randint(100, 999)}" for _ in range(n_rows)]
    descriptions = [f"Product_{random.randint(1, 50)}" for _ in range(n_rows)]
    quantities = [random.randint(1, 50) for _ in range(n_rows)]
    unit_prices = [round(random.uniform(1, 100), 2) for _ in range(n_rows)]
    customer_ids = [random.randint(1, 100) for _ in range(n_rows)]
    countries = [random.choice(["Kenya", "USA", "UK", "Germany", "Canada", "France"]) for _ in range(n_rows)]
    
    start_date = datetime(2023, 8, 12)
    invoice_dates = [start_date + timedelta(days=random.randint(0, 730)) for _ in range(n_rows)]
    
    df = pd.DataFrame({
        "InvoiceNo": invoice_nos,
        "StockCode": stock_codes,
        "Description": descriptions,
        "Quantity": quantities,
        "InvoiceDate": invoice_dates,
        "UnitPrice": unit_prices,
        "CustomerID": customer_ids,
        "Country": countries
    })
    return df

# ---------------- ETL process ----------------
def etl_retail():
    logging.info("Starting ETL process...")

    # Extract (generate data)
    df = generate_synthetic_data()
    logging.info(f"Extracted {len(df)} rows.")

    # Transform
    # Remove invalid rows
    df = df[(df["Quantity"] >= 0) & (df["UnitPrice"] > 0)]
    logging.info(f"After removing invalid rows: {len(df)} rows remain.")

    # Add TotalSales
    df["TotalSales"] = df["Quantity"] * df["UnitPrice"]

    # Filter last year's data (current date = 2025-08-12)
    one_year_ago = datetime(2024, 8, 12)
    df_last_year = df[df["InvoiceDate"] >= one_year_ago]
    logging.info(f"Filtered last year data: {len(df_last_year)} rows.")

    # Customer summary
    customer_summary = df.groupby("CustomerID").agg({
        "TotalSales": "sum",
        "Country": "first"
    }).reset_index()

    # Time dimension
    time_dim = pd.DataFrame({
        "TimeID": range(1, len(df["InvoiceDate"].dt.date.unique()) + 1),
        "Date": pd.to_datetime(df["InvoiceDate"].dt.date.unique())
    })
    time_dim["Year"] = time_dim["Date"].dt.year
    time_dim["Quarter"] = time_dim["Date"].dt.quarter

    # ---------------- Load ----------------
    conn = sqlite3.connect("retail_dw.db")
    cursor = conn.cursor()

    # Drop if exists
    cursor.execute("DROP TABLE IF EXISTS SalesFact")
    cursor.execute("DROP TABLE IF EXISTS CustomerDim")
    cursor.execute("DROP TABLE IF EXISTS TimeDim")

    # Create tables
    cursor.execute("""
    CREATE TABLE CustomerDim (
        CustomerID INTEGER PRIMARY KEY,
        TotalPurchases REAL,
        Country TEXT
    )
    """)
    
    cursor.execute("""
    CREATE TABLE TimeDim (
        TimeID INTEGER PRIMARY KEY,
        Date TEXT,
        Year INTEGER,
        Quarter INTEGER
    )
    """)

    cursor.execute("""
    CREATE TABLE SalesFact (
        InvoiceNo TEXT,
        StockCode TEXT,
        Description TEXT,
        Quantity INTEGER,
        InvoiceDate TEXT,
        UnitPrice REAL,
        CustomerID INTEGER,
        Country TEXT,
        TotalSales REAL,
        FOREIGN KEY(CustomerID) REFERENCES CustomerDim(CustomerID)
    )
    """)

    # Load data
    customer_summary.rename(columns={"TotalSales": "TotalPurchases"}, inplace=True)
    customer_summary.to_sql("CustomerDim", conn, if_exists="append", index=False)
    time_dim.to_sql("TimeDim", conn, if_exists="append", index=False)
    df_last_year.to_sql("SalesFact", conn, if_exists="append", index=False)

    conn.commit()
    conn.close()

    logging.info("ETL process complete.")
    logging.info(f"CustomerDim rows: {len(customer_summary)}")
    logging.info(f"TimeDim rows: {len(time_dim)}")
    logging.info(f"SalesFact rows: {len(df_last_year)}")

# Run the ETL
if __name__ == "__main__":
    etl_retail()


2025-08-14 17:17:37,624 - INFO - Starting ETL process...
2025-08-14 17:17:37,634 - INFO - Extracted 1000 rows.
2025-08-14 17:17:37,636 - INFO - After removing invalid rows: 1000 rows remain.
2025-08-14 17:17:37,639 - INFO - Filtered last year data: 499 rows.
2025-08-14 17:17:37,700 - INFO - ETL process complete.
2025-08-14 17:17:37,701 - INFO - CustomerDim rows: 100
2025-08-14 17:17:37,701 - INFO - TimeDim rows: 523
2025-08-14 17:17:37,702 - INFO - SalesFact rows: 499
