## Task 2: ETL Process Implementation 

In [None]:

import sqlite3
import pandas as pd
import numpy as np
from faker import Faker
from datetime import datetime, timedelta
import logging
import os

# Enhanced logging configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_retail.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

#Extract Phase
def generate_synthetic_data(num_rows=1000):
    """Generate synthetic retail data similar to UCI Online Retail dataset"""
    logger.info(f"Generating {num_rows} synthetic records...")
    fake = Faker()
    
    # Generate base data with enhanced realism
    data = {
        'InvoiceNo': [f'INV-{np.random.randint(10000,99999)}' for _ in range(num_rows)],
        'StockCode': [f'SKU-{np.random.randint(100000,999999)}' for _ in range(num_rows)],
        'Description': [fake.catch_phrase() for _ in range(num_rows)],
        'Quantity': np.random.randint(1, 50, num_rows),
        'InvoiceDate': pd.date_range(start='2023-01-01', end='2025-08-12', periods=num_rows),
        'UnitPrice': np.round(np.random.uniform(1, 100, num_rows), 2),
        'CustomerID': [f'CUST-{np.random.randint(10000,11000)}' for _ in range(num_rows)],
        'Country': np.random.choice(['UK', 'USA', 'Germany', 'France', 'Japan'], num_rows)
    }
    
    # Introduce 5% missing values in specific columns
    for col in ['CustomerID', 'Description']:
        mask = np.random.random(num_rows) < 0.05
        data[col] = np.where(mask, np.nan, data[col])
    
    df = pd.DataFrame(data)
    logger.info(f"Synthetic data generated with {len(df)} records")
    return df

# Transform Phase
def transform_data(df):
    """Perform data transformations with enhanced validation"""
    logger.info("Starting data transformation...")
    
    # 1. Convert and validate InvoiceDate
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
    df = df.dropna(subset=['InvoiceDate'])
    
    # 2. Handle missing values
    df['CustomerID'].fillna('UNKNOWN', inplace=True)
    df['Description'].fillna('UNKNOWN', inplace=True)
    
    # 3. Calculate TotalSales with validation
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']
    
    # 4. Filter valid records
    df = df[(df['Quantity'] > 0) & 
            (df['UnitPrice'] > 0) &
            (df['TotalSales'] > 0)]
    
    # 5. Filter last year of data (Aug 12, 2024 - Aug 12, 2025)
    cutoff_date = datetime(2025, 8, 12) - timedelta(days=365)
    df = df[df['InvoiceDate'] >= cutoff_date]
    
    # 6. Create enhanced customer dimension
    customer_dim = df.groupby('CustomerID').agg({
        'TotalSales': ['sum', 'mean'],
        'Country': 'first',
        'InvoiceNo': 'nunique',
        'InvoiceDate': ['min', 'max']
    })
    customer_dim.columns = ['TotalPurchases', 'AvgPurchase', 
                          'Country', 'OrderCount', 
                          'FirstPurchase', 'LastPurchase']
    customer_dim = customer_dim.reset_index()
    
    # 7. Create comprehensive time dimension
    time_dim = pd.DataFrame({
        'DateKey': df['InvoiceDate'].dt.strftime('%Y%m%d').astype(int),
        'Date': df['InvoiceDate'].dt.date,
        'DayOfWeek': df['InvoiceDate'].dt.day_name(),
        'DayOfMonth': df['InvoiceDate'].dt.day,
        'WeekOfYear': df['InvoiceDate'].dt.isocalendar().week,
        'Month': df['InvoiceDate'].dt.month,
        'Quarter': df['InvoiceDate'].dt.quarter,
        'Year': df['InvoiceDate'].dt.year,
        'IsWeekend': df['InvoiceDate'].dt.dayofweek > 4
    }).drop_duplicates()
    
    logger.info(f"Transformation complete. Valid records: {len(df)}")
    return df, customer_dim, time_dim

#Load phase
def load_to_database(fact_data, customer_dim, time_dim):
    """Load data into SQLite database with robust error handling"""
    logger.info("Initializing database load...")
    
    # Ensure clean state
    db_path = 'retail_dw.db'
    if os.path.exists(db_path):
        try:
            os.remove(db_path)
            logger.info("Removed existing database file")
        except Exception as e:
            logger.error(f"Failed to remove old DB: {str(e)}")
            return False
    
    try:
        with sqlite3.connect(db_path) as conn:
            # Enable foreign key constraints
            conn.execute("PRAGMA foreign_keys = ON")
            
            # Create tables with enhanced schema
            conn.executescript('''
            CREATE TABLE IF NOT EXISTS CustomerDim (
                CustomerID TEXT PRIMARY KEY,
                TotalPurchases REAL NOT NULL,
                AvgPurchase REAL,
                Country TEXT,
                OrderCount INTEGER,
                FirstPurchase TEXT,
                LastPurchase TEXT
            );
            
            CREATE TABLE IF NOT EXISTS TimeDim (
                DateKey INTEGER PRIMARY KEY,
                Date TEXT NOT NULL,
                DayOfWeek TEXT,
                DayOfMonth INTEGER,
                WeekOfYear INTEGER,
                Month INTEGER,
                Quarter INTEGER,
                Year INTEGER,
                IsWeekend BOOLEAN
            );
            
            CREATE TABLE IF NOT EXISTS SalesFact (
                InvoiceNo TEXT,
                StockCode TEXT,
                Description TEXT,
                Quantity INTEGER NOT NULL,
                UnitPrice REAL NOT NULL,
                TotalSales REAL NOT NULL,
                InvoiceDate TEXT NOT NULL,
                CustomerID TEXT NOT NULL,
                Country TEXT,
                DateKey INTEGER NOT NULL,
                FOREIGN KEY (CustomerID) REFERENCES CustomerDim(CustomerID),
                FOREIGN KEY (DateKey) REFERENCES TimeDim(DateKey),
                CHECK (Quantity > 0),
                CHECK (UnitPrice > 0),
                CHECK (TotalSales > 0)
            );
            ''')
            
            # Load data with transaction management
            with conn:
                logger.info("Loading dimension tables...")
                customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
                time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
                
                logger.info("Preparing fact data...")
                fact_data['DateKey'] = fact_data['InvoiceDate'].dt.strftime('%Y%m%d').astype(int)
                fact_data.to_sql('SalesFact', conn, if_exists='append', index=False)
            
            # Verify data integrity
            with conn:
                counts = {
                    'SalesFact': conn.execute("SELECT COUNT(*) FROM SalesFact").fetchone()[0],
                    'CustomerDim': conn.execute("SELECT COUNT(*) FROM CustomerDim").fetchone()[0],
                    'TimeDim': conn.execute("SELECT COUNT(*) FROM TimeDim").fetchone()[0]
                }
                
                logger.info(
                    f"Database loaded successfully with:\n"
                    f"- {counts['SalesFact']} fact records\n"
                    f"- {counts['CustomerDim']} customers\n"
                    f"- {counts['TimeDim']} time dimensions"
                )
                
                # Verify foreign key relationships
                try:
                    conn.execute("PRAGMA foreign_key_check")
                    logger.info("Foreign key validation passed")
                except sqlite3.Error as e:
                    logger.error(f"Foreign key violation: {str(e)}")
                    return False
                
        return True
        
    except Exception as e:
        logger.error(f"Database operation failed: {str(e)}")
        if os.path.exists(db_path):
            try:
                os.remove(db_path)
                logger.info("Cleaned up failed database file")
            except:
                pass
        return False


# Full ETL pipeline execution with comprehensive logging
def run_etl_pipeline():
    """Full ETL pipeline execution with comprehensive logging"""
    try:
        logger.info("=== ETL PIPELINE STARTED ===")
        
        # Extract phase
        logger.info("-- EXTRACT PHASE --")
        df = generate_synthetic_data(1000)
        logger.info(f"Extracted {len(df)} raw records")
        
        # Transform phase
        logger.info("-- TRANSFORM PHASE --")
        fact_data, customer_dim, time_dim = transform_data(df)
        
        # Load phase
        logger.info("-- LOAD PHASE --")
        if not load_to_database(fact_data, customer_dim, time_dim):
            raise RuntimeError("Database load failed")
        
        logger.info("=== ETL PIPELINE COMPLETED SUCCESSFULLY ===")
        return True
        
    except Exception as e:
        logger.critical(f"ETL pipeline failed: {str(e)}", exc_info=True)
        return False

if __name__ == "__main__":
    # Clear previous log file
    if os.path.exists('etl.log'):
        os.remove('etl.log')
    
    # Execute pipeline
    success = run_etl_pipeline()
    
    # Final verification
    if success and os.path.exists('retail_dw.db'):
        logger.info(f"Database verification: File size {os.path.getsize('retail_dw.db')} bytes")
        try:
            with sqlite3.connect('retail_dw.db') as conn:
                tables = pd.read_sql("SELECT name FROM sqlite_master WHERE type='table'", conn)
                logger.info(f"Tables created:\n{tables}")
        except Exception as e:
            logger.error(f"Verification failed: {str(e)}")
    
    exit(0 if success else 1)

2025-08-13 01:43:07,429 - INFO - === ETL PIPELINE STARTED ===
2025-08-13 01:43:07,430 - INFO - -- EXTRACT PHASE --
2025-08-13 01:43:07,431 - INFO - Generating 1000 synthetic records...
2025-08-13 01:43:07,604 - INFO - Synthetic data generated with 1000 records
2025-08-13 01:43:07,604 - INFO - Extracted 1000 raw records
2025-08-13 01:43:07,604 - INFO - -- TRANSFORM PHASE --
2025-08-13 01:43:07,604 - INFO - Starting data transformation...
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['CustomerID'].fillna('UNKNOWN', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are se

: 