# DSA 2040 Practical Exam - Section 1, Task 2
## ETL Process Implementation for Retail Data Warehouse

**Student:** Monaheng218  
**Date:** August 13, 2025  
**Total Marks:** 20

### Task Requirements:
1. Generate synthetic retail dataset (~1000 rows)
2. Extract data with validation and cleaning
3. Transform data for star schema design
4. Load data into SQLite database
5. Verify data integrity and provide summary

In [1]:
# =============================================================================
# IMPORTS AND SETUP
# =============================================================================

import pandas as pd
import numpy as np
import sqlite3
import random
from datetime import datetime, timedelta
from faker import Faker
import logging
import os

# Set random seeds for reproducible results
np.random.seed(42)
random.seed(42)
fake = Faker()
Faker.seed(42)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_process.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

print("✅ Imports and setup complete")
print(f"Python version: {pd.__version__} (pandas)")
print(f"NumPy version: {np.__version__}")

✅ Imports and setup complete
Python version: 2.0.3 (pandas)
NumPy version: 1.24.3


In [4]:
# =============================================================================
# ETL FUNCTIONS DEFINITION
# =============================================================================

def generate_synthetic_retail_data(num_rows=1000):
    """
    Generate synthetic retail data similar to UCI Online Retail dataset structure.
    
    Columns: InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country
    
    Args:
        num_rows (int): Number of transactions to generate (default: 1000)
    
    Returns:
        pd.DataFrame: Generated synthetic retail data
    """
    logger.info(f"Generating {num_rows} rows of synthetic retail data...")
    
    # Define data generation parameters
    countries = ['United Kingdom', 'France', 'Australia', 'Netherlands', 'Germany', 
                'Norway', 'EIRE', 'Switzerland', 'Spain', 'Poland', 'Portugal', 
                'Italy', 'Belgium', 'Lithuania', 'Japan', 'Iceland']
    
    product_categories = ['HOME', 'GARDEN', 'GIFT', 'OFFICE', 'KITCHEN', 'DECOR', 'TOY']
    
    # Generate base data
    start_date = datetime(2023, 1, 1)
    end_date = datetime(2023, 12, 31)
    
    data = []
    invoice_counter = 536365  # Starting invoice number (UCI format)
    customer_ids = list(range(12346, 12346 + 200))  # Pool of customer IDs
    
    for i in range(num_rows):
        # Generate invoice details
        if i % random.randint(1, 5) == 0:  # New invoice every 1-5 items
            invoice_counter += 1
            current_invoice = str(invoice_counter)
            current_customer = random.choice(customer_ids)
            current_country = random.choice(countries)
            invoice_date = fake.date_time_between(start_date=start_date, end_date=end_date)
        
        # Generate product details
        category = random.choice(product_categories)
        stock_code = f"{category[:2]}{random.randint(10000, 99999)}"
        description = f"{category.title()} {fake.word().title()} {fake.word().title()}"
        
        # Generate quantities and prices
        quantity = random.randint(1, 50)
        unit_price = round(random.uniform(0.5, 25.0), 2)
        
        # Add some cancelled orders (negative quantities)
        if random.random() < 0.05:  # 5% cancelled orders
            quantity = -quantity
            current_invoice = f"C{current_invoice}"  # Cancelled invoice prefix
        
        data.append({
            'InvoiceNo': current_invoice,
            'StockCode': stock_code,
            'Description': description,
            'Quantity': quantity,
            'InvoiceDate': invoice_date,
            'UnitPrice': unit_price,
            'CustomerID': current_customer,
            'Country': current_country
        })
    
    df = pd.DataFrame(data)
    
    # Add some missing values to simulate real-world data
    missing_indices = random.sample(range(len(df)), int(len(df) * 0.02))  # 2% missing
    df.loc[missing_indices, 'Description'] = None
    
    logger.info(f"Generated dataset shape: {df.shape}")
    logger.info(f"Date range: {df['InvoiceDate'].min()} to {df['InvoiceDate'].max()}")
    logger.info(f"Unique customers: {df['CustomerID'].nunique()}")
    logger.info(f"Unique products: {df['StockCode'].nunique()}")
    
    return df

def extract_data(raw_data):
    """
    Extract and validate raw data.
    
    Args:
        raw_data (pd.DataFrame): Raw retail data
        
    Returns:
        pd.DataFrame: Validated extracted data
    """
    logger.info("Starting data extraction phase...")
    
    extracted_data = raw_data.copy()
    original_rows = len(extracted_data)
    
    # Data type conversion
    extracted_data['InvoiceDate'] = pd.to_datetime(extracted_data['InvoiceDate'])
    extracted_data['Quantity'] = pd.to_numeric(extracted_data['Quantity'], errors='coerce')
    extracted_data['UnitPrice'] = pd.to_numeric(extracted_data['UnitPrice'], errors='coerce')
    extracted_data['CustomerID'] = pd.to_numeric(extracted_data['CustomerID'], errors='coerce')
    
    # Handle missing values
    missing_before = extracted_data.isnull().sum().sum()
    extracted_data['Description'] = extracted_data['Description'].fillna('Unknown Product')
    missing_after = extracted_data.isnull().sum().sum()
    
    # Remove rows with missing critical fields
    critical_fields = ['InvoiceNo', 'StockCode', 'Quantity', 'UnitPrice', 'CustomerID']
    extracted_data = extracted_data.dropna(subset=critical_fields)
    
    final_rows = len(extracted_data)
    
    logger.info(f"Extraction complete:")
    logger.info(f"  - Original rows: {original_rows}")
    logger.info(f"  - Missing values filled: {missing_before - missing_after}")
    logger.info(f"  - Final rows: {final_rows}")
    logger.info(f"  - Rows removed: {original_rows - final_rows}")
    
    return extracted_data

def transform_data(extracted_data):
    """
    Transform data for star schema dimensional model.
    
    Args:
        extracted_data (pd.DataFrame): Extracted data
        
    Returns:
        tuple: (sales_fact, customer_dim, product_dim, time_dim)
    """
    logger.info("Starting data transformation phase...")
    
    df = extracted_data.copy()
    
    # Calculate total sales
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']
    
    # Filter for last year data only (2023)
    df = df[df['InvoiceDate'].dt.year == 2023]
    
    # Remove outliers (extremely high quantities or prices)
    df = df[(df['Quantity'].abs() <= 100) & (df['UnitPrice'] <= 100)]
    
    # Create Customer Dimension
    customer_dim = df[['CustomerID', 'Country']].drop_duplicates().reset_index(drop=True)
    customer_dim['CustomerKey'] = range(1, len(customer_dim) + 1)
    customer_dim = customer_dim[['CustomerKey', 'CustomerID', 'Country']]
    
    # Create Product Dimension
    product_dim = df[['StockCode', 'Description']].drop_duplicates().reset_index(drop=True)
    product_dim['ProductKey'] = range(1, len(product_dim) + 1)
    product_dim['Category'] = product_dim['StockCode'].str[:2]  # Extract category from stock code
    product_dim = product_dim[['ProductKey', 'StockCode', 'Description', 'Category']]
    
    # Create Time Dimension
    time_data = []
    for date in df['InvoiceDate'].dt.date.unique():
        dt = pd.to_datetime(date)
        time_data.append({
            'Date': date,
            'Year': dt.year,
            'Month': dt.month,
            'Day': dt.day,
            'Quarter': dt.quarter,
            'DayOfWeek': dt.dayofweek + 1,
            'MonthName': dt.strftime('%B'),
            'DayName': dt.strftime('%A')
        })
    
    time_dim = pd.DataFrame(time_data).drop_duplicates().reset_index(drop=True)
    time_dim['TimeKey'] = range(1, len(time_dim) + 1)
    time_dim = time_dim[['TimeKey', 'Date', 'Year', 'Month', 'Day', 'Quarter', 
                        'DayOfWeek', 'MonthName', 'DayName']]
    
    # Create Sales Fact table
    sales_fact = df.merge(customer_dim[['CustomerKey', 'CustomerID']], on='CustomerID')
    sales_fact = sales_fact.merge(product_dim[['ProductKey', 'StockCode']], on='StockCode')
    sales_fact = sales_fact.merge(time_dim[['TimeKey', 'Date']], 
                                 left_on=sales_fact['InvoiceDate'].dt.date, 
                                 right_on='Date')
    
    sales_fact = sales_fact[['InvoiceNo', 'CustomerKey', 'ProductKey', 'TimeKey', 
                           'Quantity', 'UnitPrice', 'TotalSales']]
    sales_fact['SalesKey'] = range(1, len(sales_fact) + 1)
    sales_fact = sales_fact[['SalesKey', 'InvoiceNo', 'CustomerKey', 'ProductKey', 
                           'TimeKey', 'Quantity', 'UnitPrice', 'TotalSales']]
    
    logger.info(f"Transformation complete:")
    logger.info(f"  - Sales Fact records: {len(sales_fact)}")
    logger.info(f"  - Customer Dimension records: {len(customer_dim)}")
    logger.info(f"  - Product Dimension records: {len(product_dim)}")
    logger.info(f"  - Time Dimension records: {len(time_dim)}")
    
    return sales_fact, customer_dim, product_dim, time_dim

def load_data_to_warehouse(sales_fact, customer_dim, product_dim, time_dim, db_path='retail_dw.db'):
    """
    Load transformed data into SQLite data warehouse.
    
    Args:
        sales_fact (pd.DataFrame): Sales fact table
        customer_dim (pd.DataFrame): Customer dimension
        product_dim (pd.DataFrame): Product dimension
        time_dim (pd.DataFrame): Time dimension
        db_path (str): Database file path
        
    Returns:
        bool: Success status
    """
    logger.info(f"Starting data loading to {db_path}...")
    
    try:
        # Remove existing database
        if os.path.exists(db_path):
            os.remove(db_path)
            logger.info(f"Removed existing database: {db_path}")
        
        # Create connection
        conn = sqlite3.connect(db_path)
        
        # Read and execute schema - try current directory first
        schema_path = 'warehouse_schema.sql'
        if not os.path.exists(schema_path):
            # Try parent directory
            schema_path = '../warehouse_schema.sql'
        
        if os.path.exists(schema_path):
            with open(schema_path, 'r') as f:
                schema_sql = f.read()
            conn.executescript(schema_sql)
            logger.info(f"Database schema created successfully from {schema_path}")
        else:
            logger.warning(f"Schema file not found, creating basic tables")
            # Create basic schema if file doesn't exist
            basic_schema = """
            CREATE TABLE CustomerDim (
                CustomerKey INTEGER PRIMARY KEY,
                CustomerID INTEGER,
                Country TEXT
            );
            
            CREATE TABLE ProductDim (
                ProductKey INTEGER PRIMARY KEY,
                StockCode TEXT,
                Description TEXT,
                Category TEXT
            );
            
            CREATE TABLE TimeDim (
                TimeKey INTEGER PRIMARY KEY,
                Date DATE,
                Year INTEGER,
                Month INTEGER,
                Day INTEGER,
                Quarter INTEGER,
                DayOfWeek INTEGER,
                MonthName TEXT,
                DayName TEXT
            );
            
            CREATE TABLE SalesFact (
                SalesKey INTEGER PRIMARY KEY,
                InvoiceNo TEXT,
                CustomerKey INTEGER,
                ProductKey INTEGER,
                TimeKey INTEGER,
                Quantity INTEGER,
                UnitPrice DECIMAL(10,2),
                TotalSales DECIMAL(10,2),
                FOREIGN KEY (CustomerKey) REFERENCES CustomerDim(CustomerKey),
                FOREIGN KEY (ProductKey) REFERENCES ProductDim(ProductKey),
                FOREIGN KEY (TimeKey) REFERENCES TimeDim(TimeKey)
            );
            """
            conn.executescript(basic_schema)
        
        # Load dimension tables first
        customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
        product_dim.to_sql('ProductDim', conn, if_exists='append', index=False)
        time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
        
        # Load fact table
        sales_fact.to_sql('SalesFact', conn, if_exists='append', index=False)
        
        # Verify data integrity
        cursor = conn.cursor()
        
        # Check row counts
        tables = ['CustomerDim', 'ProductDim', 'TimeDim', 'SalesFact']
        for table in tables:
            cursor.execute(f"SELECT COUNT(*) FROM {table}")
            count = cursor.fetchone()[0]
            logger.info(f"  - {table}: {count} rows loaded")
        
        # Verify foreign key integrity
        cursor.execute("""
            SELECT COUNT(*) FROM SalesFact sf
            LEFT JOIN CustomerDim cd ON sf.CustomerKey = cd.CustomerKey
            LEFT JOIN ProductDim pd ON sf.ProductKey = pd.ProductKey
            LEFT JOIN TimeDim td ON sf.TimeKey = td.TimeKey
            WHERE cd.CustomerKey IS NULL OR pd.ProductKey IS NULL OR td.TimeKey IS NULL
        """)
        orphaned_records = cursor.fetchone()[0]
        
        if orphaned_records > 0:
            logger.warning(f"Found {orphaned_records} orphaned records in SalesFact")
        else:
            logger.info("✅ All foreign key constraints verified")
        
        conn.close()
        
        logger.info(f"✅ Data successfully loaded to {db_path}")
        return True
        
    except Exception as e:
        logger.error(f"Load phase failed: {str(e)}")
        return False

def perform_full_etl(num_rows=1000, db_path='retail_dw.db'):
    """
    Perform the complete ETL process and log the number of rows processed at each stage.
    
    Args:
        num_rows (int): Number of rows to generate
        db_path (str): Database file path
        
    Returns:
        dict: ETL process summary with row counts
    """
    start_time = datetime.now()
    logger.info("="*60)
    logger.info("STARTING COMPLETE ETL PROCESS")
    logger.info("="*60)
    
    try:
        # Stage 1: Generate/Extract
        logger.info("STAGE 1: DATA GENERATION & EXTRACTION")
        raw_data = generate_synthetic_retail_data(num_rows)
        extracted_data = extract_data(raw_data)
        extract_rows = len(extracted_data)
        
        # Stage 2: Transform
        logger.info("STAGE 2: DATA TRANSFORMATION")
        sales_fact, customer_dim, product_dim, time_dim = transform_data(extracted_data)
        transform_rows = len(sales_fact)
        
        # Stage 3: Load
        logger.info("STAGE 3: DATA LOADING")
        load_success = load_data_to_warehouse(sales_fact, customer_dim, product_dim, time_dim, db_path)
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        # ETL Summary
        summary = {
            'status': 'SUCCESS' if load_success else 'FAILED',
            'start_time': start_time.strftime('%Y-%m-%d %H:%M:%S'),
            'end_time': end_time.strftime('%Y-%m-%d %H:%M:%S'),
            'duration_seconds': duration,
            'rows_generated': num_rows,
            'rows_extracted': extract_rows,
            'rows_transformed': transform_rows,
            'fact_table_rows': len(sales_fact),
            'customer_dimension_rows': len(customer_dim),
            'product_dimension_rows': len(product_dim),
            'time_dimension_rows': len(time_dim),
            'database_file': db_path
        }
        
        logger.info("="*60)
        logger.info("ETL PROCESS COMPLETED")
        logger.info(f"Status: {summary['status']}")
        logger.info(f"Total Duration: {duration:.2f} seconds")
        logger.info(f"Rows Processed: {num_rows} → {extract_rows} → {transform_rows}")
        logger.info("="*60)
        
        return summary
        
    except Exception as e:
        logger.error(f"ETL process failed: {str(e)}")
        return {'status': 'FAILED', 'error': str(e)}

print("✅ ETL functions defined successfully")

✅ ETL functions defined successfully


In [5]:
# =============================================================================
# EXECUTE COMPLETE ETL PROCESS
# =============================================================================

print("\n" + "="*60)
print("COMPLETE ETL PROCESS EXECUTION")
print("="*60)

# Execute ETL process with 1000 rows
etl_summary = perform_full_etl(1000, 'retail_dw.db')

# Display final summary
print("\nETL PROCESS SUMMARY")
print("-" * 30)
for key, value in etl_summary.items():
    if key != 'error':
        print(f"{key.replace('_', ' ').title()}: {value}")

# Verify final database state
if etl_summary['status'] == 'SUCCESS':
    print("\n✅ ETL Process completed successfully!")
    print("Database 'retail_dw.db' is ready for OLAP queries.")
    
    # Quick verification query
    import sqlite3
    conn = sqlite3.connect('retail_dw.db')
    
    print("\n📊 Quick Database Verification:")
    verification_query = """
    SELECT 
        'Total Sales Revenue' as Metric,
        printf('$%.2f', SUM(TotalSales)) as Value
    FROM SalesFact
    UNION ALL
    SELECT 
        'Total Transactions' as Metric,
        COUNT(*) as Value
    FROM SalesFact
    UNION ALL
    SELECT 
        'Unique Customers' as Metric,
        COUNT(DISTINCT CustomerKey) as Value
    FROM SalesFact
    UNION ALL
    SELECT 
        'Unique Products' as Metric,
        COUNT(*) as Value
    FROM ProductDim
    """
    
    results = pd.read_sql_query(verification_query, conn)
    for _, row in results.iterrows():
        print(f"  - {row['Metric']}: {row['Value']}")
    
    conn.close()
else:
    print("\n❌ ETL Process failed. Check logs for details.")
    if 'error' in etl_summary:
        print(f"Error: {etl_summary['error']}")

2025-08-13 16:04:57,887 - INFO - STARTING COMPLETE ETL PROCESS
2025-08-13 16:04:57,903 - INFO - STAGE 1: DATA GENERATION & EXTRACTION
2025-08-13 16:04:57,903 - INFO - Generating 1000 rows of synthetic retail data...
2025-08-13 16:04:58,048 - INFO - Generated dataset shape: (1000, 8)
2025-08-13 16:04:58,048 - INFO - Date range: 2023-01-04 04:13:08 to 2023-12-30 18:23:29
2025-08-13 16:04:58,063 - INFO - Unique customers: 177
2025-08-13 16:04:58,063 - INFO - Unique products: 999
2025-08-13 16:04:58,063 - INFO - Starting data extraction phase...
2025-08-13 16:04:58,079 - INFO - Extraction complete:



COMPLETE ETL PROCESS EXECUTION


2025-08-13 16:04:58,079 - INFO -   - Original rows: 1000
2025-08-13 16:04:58,079 - INFO -   - Missing values filled: 20
2025-08-13 16:04:58,079 - INFO -   - Final rows: 1000
2025-08-13 16:04:58,079 - INFO -   - Rows removed: 0
2025-08-13 16:04:58,079 - INFO - STAGE 2: DATA TRANSFORMATION
2025-08-13 16:04:58,095 - INFO - Starting data transformation phase...
2025-08-13 16:04:58,171 - INFO - Transformation complete:
2025-08-13 16:04:58,187 - INFO -   - Sales Fact records: 3101
2025-08-13 16:04:58,187 - INFO -   - Customer Dimension records: 419
2025-08-13 16:04:58,187 - INFO -   - Product Dimension records: 1000
2025-08-13 16:04:58,187 - INFO -   - Time Dimension records: 245
2025-08-13 16:04:58,187 - INFO - STAGE 3: DATA LOADING
2025-08-13 16:04:58,187 - INFO - Starting data loading to retail_dw.db...
2025-08-13 16:04:58,187 - INFO - Removed existing database: retail_dw.db
2025-08-13 16:04:58,303 - INFO - Database schema created successfully from warehouse_schema.sql
2025-08-13 16:04:58


ETL PROCESS SUMMARY
------------------------------
Status: SUCCESS
Start Time: 2025-08-13 16:04:57
End Time: 2025-08-13 16:04:58
Duration Seconds: 0.555013
Rows Generated: 1000
Rows Extracted: 1000
Rows Transformed: 3101
Fact Table Rows: 3101
Customer Dimension Rows: 419
Product Dimension Rows: 1000
Time Dimension Rows: 245
Database File: retail_dw.db

✅ ETL Process completed successfully!
Database 'retail_dw.db' is ready for OLAP queries.

📊 Quick Database Verification:
  - Total Sales Revenue: $881802.21
  - Total Transactions: 3101
  - Unique Customers: 419
  - Unique Products: 1000
