In [1]:
# etl_retail_fixed.py
import sqlite3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import random

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RetailETL:
    def __init__(self):
        self.db_file = 'retail_dw.db'
        self.current_date = datetime(2025, 8, 12)  # As per exam instructions
        
        # Manual alternatives for Faker data
        self.first_names = ['John', 'Jane', 'Robert', 'Emily', 'Michael', 'Sarah', 'David', 'Lisa']
        self.last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Miller', 'Davis']
        self.cities = ['New York', 'London', 'Berlin', 'Paris', 'Tokyo', 'Sydney', 'Toronto']
        self.companies = ['TechCorp', 'GlobalMart', 'HomeGoods', 'FashionCo', 'ElectroWorld']
        
    def generate_name(self):
        return f"{random.choice(self.first_names)} {random.choice(self.last_names)}"
    
    def generate_synthetic_data(self, num_records=1000):
        """Generate synthetic retail data without Faker"""
        logger.info(f"Generating {num_records} synthetic retail records")
        
        # Product categories and subcategories
        categories = {
            'Electronics': ['Smartphones', 'Laptops', 'Tablets', 'Accessories'],
            'Clothing': ['Men', 'Women', 'Kids', 'Accessories'],
            'Home': ['Furniture', 'Decor', 'Kitchen'],
            'Books': ['Fiction', 'Non-Fiction', 'Educational'],
            'Sports': ['Equipment', 'Apparel', 'Footwear']
        }
        
        # Generate products - ensure unique product_ids
        products = []
        product_id = 1
        for cat, subcats in categories.items():
            for subcat in subcats:
                for _ in range(5):  # 5 products per subcategory
                    products.append({
                        'product_id': product_id,
                        'name': f"{cat[:3]}-{subcat[:3]}-{product_id:03d}",
                        'category': cat,
                        'subcategory': subcat,
                        'supplier': random.choice(self.companies),
                        'current_price': round(random.uniform(10, 500), 2)
                    })
                    product_id += 1
        
        # Generate customers - ensure unique customer_ids
        customers = []
        countries = ['UK', 'USA', 'Germany', 'France', 'Australia', 'Japan', 'Canada']
        for customer_id in range(1, 101):  # 100 unique customers
            customers.append({
                'customer_id': customer_id,
                'name': self.generate_name(),
                'location': random.choice(self.cities),
                'country': random.choice(countries),
                'demographic_segment': random.choice(['Young', 'Adult', 'Senior', 'Student']),
                'registration_date': (self.current_date - timedelta(days=random.randint(365, 1095))).strftime('%Y-%m-%d')
            })
        
        # Generate sales transactions
        data = []
        for _ in range(num_records):
            product = random.choice(products)
            customer = random.choice(customers)
            
            # Generate random date within last 2 years
            days_ago = random.randint(1, 730)
            date = (self.current_date - timedelta(days=days_ago)).strftime('%Y-%m-%d')
            
            quantity = random.randint(1, 50)
            unit_price = product['current_price'] * random.uniform(0.8, 1.2)
            total_sales = quantity * unit_price
            
            data.append({
                'InvoiceNo': f"INV{random.randint(1000, 9999)}",
                'StockCode': f"SKU{product['product_id']:04d}",
                'Description': product['name'],
                'Quantity': quantity,
                'InvoiceDate': date,
                'UnitPrice': round(unit_price, 2),
                'CustomerID': customer['customer_id'],
                'Country': customer['country'],
                'ProductID': product['product_id'],
                'Category': product['category'],
                'TotalSales': round(total_sales, 2)
            })
        
        df = pd.DataFrame(data)
        logger.info(f"Synthetic data generated with {len(df)} records")
        return df, products, customers
    
    def transform_data(self, df):
        """Transform the raw data"""
        logger.info("Starting data transformation")
        
        # Convert InvoiceDate to datetime
        df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
        
        # Handle missing values
        df.dropna(subset=['CustomerID', 'UnitPrice'], inplace=True)
        
        # Handle outliers
        df = df[(df['Quantity'] > 0) & (df['UnitPrice'] > 0)]
        
        # Filter for last year of data
        one_year_ago = self.current_date - timedelta(days=365)
        df = df[df['InvoiceDate'] >= one_year_ago]
        
        logger.info(f"Data transformed. {len(df)} records remaining after transformations")
        return df
    
    def create_database(self):
        """Create SQLite database with schema"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        # Drop tables if they exist to start fresh
        cursor.executescript('''
        DROP TABLE IF EXISTS SalesFact;
        DROP TABLE IF EXISTS CustomerDim;
        DROP TABLE IF EXISTS ProductDim;
        DROP TABLE IF EXISTS TimeDim;
        
        CREATE TABLE CustomerDim (
            customer_id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            location TEXT,
            country TEXT,
            demographic_segment TEXT,
            registration_date DATE
        );
        
        CREATE TABLE ProductDim (
            product_id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            category TEXT NOT NULL,
            subcategory TEXT,
            supplier TEXT,
            current_price REAL
        );
        
        CREATE TABLE TimeDim (
            time_id INTEGER PRIMARY KEY,
            date DATE NOT NULL,
            day INTEGER NOT NULL,
            month INTEGER NOT NULL,
            quarter INTEGER NOT NULL,
            year INTEGER NOT NULL,
            is_weekend BOOLEAN NOT NULL
        );
        
        CREATE TABLE SalesFact (
            fact_sales_id INTEGER PRIMARY KEY AUTOINCREMENT,
            customer_id INTEGER NOT NULL,
            product_id INTEGER NOT NULL,
            time_id INTEGER NOT NULL,
            quantity INTEGER NOT NULL,
            unit_price REAL NOT NULL,
            total_sales REAL NOT NULL,
            FOREIGN KEY (customer_id) REFERENCES CustomerDim (customer_id),
            FOREIGN KEY (product_id) REFERENCES ProductDim (product_id),
            FOREIGN KEY (time_id) REFERENCES TimeDim (time_id)
        );
        
        CREATE INDEX idx_sales_customer ON SalesFact(customer_id);
        CREATE INDEX idx_sales_product ON SalesFact(product_id);
        CREATE INDEX idx_sales_time ON SalesFact(time_id);
        ''')
        
        conn.commit()
        conn.close()
        logger.info("Database schema created (tables dropped and recreated)")
    
    def load_data(self, df, products, customers):
        """Load data into database with proper error handling"""
        conn = sqlite3.connect(self.db_file)
        
        try:
            # Load ProductDim - ensure no duplicates
            product_dim = pd.DataFrame(products).drop_duplicates('product_id')
            product_dim.to_sql('ProductDim', conn, if_exists='append', index=False)
            logger.info(f"Loaded {len(product_dim)} unique records into ProductDim")
            
            # Load CustomerDim - ensure no duplicates
            customer_dim = pd.DataFrame(customers).drop_duplicates('customer_id')
            customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
            logger.info(f"Loaded {len(customer_dim)} unique records into CustomerDim")
            
            # Load TimeDim
            df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
            unique_dates = df['InvoiceDate'].dt.date.unique()
            time_dim_data = []
            
            for date in unique_dates:
                date_obj = datetime.strptime(str(date), '%Y-%m-%d')
                time_dim_data.append({
                    'time_id': int(date_obj.strftime('%Y%m%d')),
                    'date': date,
                    'day': date_obj.day,
                    'month': date_obj.month,
                    'quarter': (date_obj.month - 1) // 3 + 1,
                    'year': date_obj.year,
                    'is_weekend': date_obj.weekday() >= 5
                })
            
            time_dim = pd.DataFrame(time_dim_data).drop_duplicates('time_id')
            time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
            logger.info(f"Loaded {len(time_dim)} unique records into TimeDim")
            
            # Load SalesFact
            df['time_id'] = df['InvoiceDate'].dt.strftime('%Y%m%d').astype(int)
            fact_data = df[['CustomerID', 'ProductID', 'time_id', 'Quantity', 'UnitPrice', 'TotalSales']]
            fact_data.columns = ['customer_id', 'product_id', 'time_id', 'quantity', 'unit_price', 'total_sales']
            
            # Ensure foreign key constraints are satisfied
            valid_customers = pd.read_sql('SELECT customer_id FROM CustomerDim', conn)['customer_id'].unique()
            valid_products = pd.read_sql('SELECT product_id FROM ProductDim', conn)['product_id'].unique()
            valid_times = pd.read_sql('SELECT time_id FROM TimeDim', conn)['time_id'].unique()
            
            fact_data = fact_data[
                fact_data['customer_id'].isin(valid_customers) &
                fact_data['product_id'].isin(valid_products) &
                fact_data['time_id'].isin(valid_times)
            ]
            
            fact_data.to_sql('SalesFact', conn, if_exists='append', index=False)
            logger.info(f"Loaded {len(fact_data)} records into SalesFact with valid foreign keys")
            
        except Exception as e:
            logger.error(f"Error loading data: {e}")
            raise
        finally:
            conn.close()
    
    def run_etl(self):
        """Execute full ETL pipeline"""
        try:
            # Extract
            raw_data, products, customers = self.generate_synthetic_data(1000)
            
            # Transform
            transformed_data = self.transform_data(raw_data)
            
            # Create database (drops existing tables)
            self.create_database()
            
            # Load
            self.load_data(transformed_data, products, customers)
            
            logger.info("ETL process completed successfully")
            return True
        except Exception as e:
            logger.error(f"ETL process failed: {str(e)}")
            return False

if __name__ == "__main__":
    etl = RetailETL()
    etl.run_etl()

INFO:__main__:Generating 1000 synthetic retail records
INFO:__main__:Synthetic data generated with 1000 records
INFO:__main__:Starting data transformation
INFO:__main__:Data transformed. 518 records remaining after transformations
INFO:__main__:Database schema created (tables dropped and recreated)
INFO:__main__:Loaded 85 unique records into ProductDim
INFO:__main__:Loaded 100 unique records into CustomerDim
INFO:__main__:Loaded 285 unique records into TimeDim
INFO:__main__:Loaded 518 records into SalesFact with valid foreign keys
INFO:__main__:ETL process completed successfully
