## Extract

In [1]:
import pandas as pd
import sqlite3
from datetime import datetime, timedelta

def extract(data_source='C:\\Users\\flags\\End-sem DSA 2040\\DSA-2040_Practical_Exam_PatriciaKiarie781\\Data Warehousing\\ETL_Process\\raw\\Online Retail.xlsx'):
    """
    Extracts data from an Excel file into a pandas DataFrame.
    Handles missing values, data type conversions, and basic data validation.
    
    Args:
        data_source (str): Path to the Excel file containing retail data
        
    Returns:
        pd.DataFrame or None: Cleaned DataFrame if successful, None if extraction fails
    """
    print("\n--- Starting Extraction Phase ---")
    
    try:
        # Read the Excel file
        df_raw = pd.read_excel(data_source)
        print(f"Data extracted from {data_source}. Total rows: {len(df_raw)}")
        
        # Basic data validation
        required_columns = ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 
                          'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country']
        missing_columns = [col for col in required_columns if col not in df_raw.columns]
        if missing_columns:
            print(f"Error: Missing required columns: {missing_columns}")
            return None
            
        # Handle missing values
        print("\nMissing values before cleaning:")
        print(df_raw.isnull().sum())
        
        # Drop rows with missing CustomerID as it's crucial for the customer dimension
        df_raw.dropna(subset=['CustomerID'], inplace=True)
        
        # Fill missing values for other columns
        df_raw['Description'].fillna('Unknown Product', inplace=True)
        df_raw['Country'].fillna('Unknown', inplace=True)
        
        # Convert data types
        df_raw['CustomerID'] = df_raw['CustomerID'].astype(int)
        df_raw['InvoiceDate'] = pd.to_datetime(df_raw['InvoiceDate'])
        df_raw['UnitPrice'] = pd.to_numeric(df_raw['UnitPrice'], errors='coerce')
        df_raw['Quantity'] = pd.to_numeric(df_raw['Quantity'], errors='coerce')
        
        # Remove rows with invalid numeric values
        df_raw.dropna(subset=['UnitPrice', 'Quantity'], inplace=True)
        
        print("\nMissing values after cleaning:")
        print(df_raw.isnull().sum())
        print(f"\nFinal row count: {len(df_raw)}")
        print("\nData types of columns:")
        print(df_raw.dtypes)
        
        print("\nExtraction complete.")
        return df_raw
        
    except FileNotFoundError:
        print(f"Error: Excel file '{data_source}' not found. Please ensure the file is in the correct directory.")
        return None
    except Exception as e:
        print(f"Error during data extraction: {e}")
        return None

## Transformation

In [19]:
def transform(df_raw):
    """
    Transforms the raw data by cleaning, filtering, and creating new columns/tables.
    """
    print("\n--- Starting Transformation Phase ---")

    # Handle outliers by removing rows where Quantity < 0 or UnitPrice <= 0.
    df_cleaned = df_raw[(df_raw['Quantity'] > 0) & (df_raw['UnitPrice'] > 0)].copy()
    print(f"Cleaned data. Removed {len(df_raw) - len(df_cleaned)} rows with invalid quantity or price.")

    # Calculate the new 'TotalSales' column.
    df_cleaned['TotalSales'] = df_cleaned['Quantity'] * df_cleaned['UnitPrice']
    print(f"Created 'TotalSales' column. New row count: {len(df_cleaned)}")
    
    # Filter the data for sales in the last year, assuming a current date of August 12, 2025.
    current_date = datetime(2025, 8, 12)
    one_year_ago = current_date - timedelta(days=365)
    df_filtered = df_cleaned[df_cleaned['InvoiceDate'] >= one_year_ago].copy()
    print(f"Filtered for the last year. Rows remaining: {len(df_filtered)}")
    
    # Sort by InvoiceDate to prepare for creating the Time Dimension.
    df_filtered.sort_values('InvoiceDate', inplace=True)
    
    # Create the Customer Dimension table.
    customer_dim = df_filtered.groupby('CustomerID').agg(
        total_purchases=('TotalSales', 'sum'),
        country=('Country', lambda x: x.mode()[0] if not x.mode().empty else None)
    ).reset_index()
    customer_dim['customer_key'] = customer_dim.index + 1
    print(f"Created CustomerDim with {len(customer_dim)} unique customers.")
    
    # Create the Time Dimension table.
    time_dim = pd.DataFrame(
        df_filtered['InvoiceDate'].dt.date.unique(), columns=['date']
    )
    time_dim.sort_values('date', inplace=True)
    time_dim['time_key'] = time_dim.index + 1
    time_dim['day_of_week'] = time_dim['date'].apply(lambda x: x.strftime('%A'))
    time_dim['month'] = time_dim['date'].apply(lambda x: x.strftime('%B'))
    time_dim['quarter'] = time_dim['date'].apply(lambda x: pd.Timestamp(x).quarter)
    time_dim['year'] = time_dim['date'].apply(lambda x: x.year)
    print(f"Created TimeDim with {len(time_dim)} unique dates.")
    
    # Prepare the Fact table by merging with the dimension keys.
    df_fact = pd.merge(df_filtered, customer_dim[['CustomerID', 'customer_key']], on='CustomerID', how='left')
    df_fact = pd.merge(df_fact, time_dim[['date', 'time_key']], left_on=df_fact['InvoiceDate'].dt.date, right_on='date', how='left')
    df_fact.drop(columns=['key_0', 'date'], inplace=True) # Clean up merge columns
    
    # Select the final columns for the Sales Fact table.
    sales_fact = df_fact[[
        'customer_key', 
        'time_key', 
        'Quantity', 
        'UnitPrice', 
        'TotalSales'
    ]]
    print(f"Prepared SalesFact table with {len(sales_fact)} rows.")
    
    print("Transformation complete.")
    return sales_fact, customer_dim, time_dim

## Loading

In [20]:
def load(sales_fact, customer_dim, time_dim, db_name='retail_dw.db'):
    """
    Loads the transformed data into a SQLite database.
    """
    print("\n--- Starting Loading Phase ---")
    
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    print(f"Connected to database: {db_name}")
    
    try:
        # Drop tables if they exist to ensure a clean load.
        cursor.execute("DROP TABLE IF EXISTS CustomerDim;")
        cursor.execute("DROP TABLE IF EXISTS TimeDim;")
        cursor.execute("DROP TABLE IF EXISTS SalesFact;")
        
        # Create the new tables with the correct schema.
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS CustomerDim (
                customer_key INTEGER PRIMARY KEY,
                CustomerID TEXT NOT NULL UNIQUE,
                total_purchases REAL,
                country TEXT
            );
        """)
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS TimeDim (
                time_key INTEGER PRIMARY KEY,
                date TEXT NOT NULL UNIQUE,
                day_of_week TEXT,
                month TEXT,
                quarter INTEGER,
                year INTEGER
            );
        """)
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS SalesFact (
                sales_id INTEGER PRIMARY KEY AUTOINCREMENT,
                customer_key INTEGER,
                time_key INTEGER,
                quantity INTEGER NOT NULL,
                unit_price REAL NOT NULL,
                total_sales REAL NOT NULL,
                FOREIGN KEY(customer_key) REFERENCES CustomerDim(customer_key),
                FOREIGN KEY(time_key) REFERENCES TimeDim(time_key)
            );
        """)
        print("Database tables created.")

        # Load data from the pandas DataFrames into the SQLite tables.
        customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
        print(f"Loaded {len(customer_dim)} rows into CustomerDim.")
        
        time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
        print(f"Loaded {len(time_dim)} rows into TimeDim.")
        
        sales_fact.to_sql('SalesFact', conn, if_exists='append', index=False)
        print(f"Loaded {len(sales_fact)} rows into SalesFact.")
        
        conn.commit()
        print("Data loading complete. Changes committed.")

    except Exception as e:
        print(f"An error occurred during the loading phase: {e}")
        conn.rollback()
    finally:
        conn.close()
        print("Database connection closed.")

In [21]:
def run_etl():
    """
    Main function to execute the full ETL process.
    """
    try:
        # Run the Extraction phase.
        raw_df = extract()
        if raw_df is None:
            return # Exit if extraction failed

        # Run the Transformation phase.
        sales_fact_df, customer_dim_df, time_dim_df = transform(raw_df)
        
        # Run the Loading phase.
        load(sales_fact_df, customer_dim_df, time_dim_df)
        
        print("\nETL process completed successfully! 🎉")
    except Exception as e:
        print(f"An unexpected error occurred during the ETL process: {e}")

if __name__ == "__main__":
    run_etl()


--- Starting Extraction Phase ---
Error during data extraction: Missing optional dependency 'openpyxl'.  Use pip or conda to install openpyxl.
