In [16]:
# etl_load.py
# DSA 2040A - Lab 4: Load in ETL (Shopping Trends)
# Name: Queen Esther Kibegi
# Course: DSA 2040A – Data Science & Analytics
# Lab: Lab 4 – Load in ETL
# Dataset Used: transformed_full.csv, transformed_incremental.csv

import pandas as pd
import sqlite3
from sqlalchemy import create_engine
import os
from datetime import datetime

def create_sqlite_table(conn, table_name, schema):
    """
    Create a SQLite table with the given schema.
    
    Args:
        conn: SQLite connection object
        table_name: Name of the table to create
        schema: SQL schema string for table creation
    """
    try:
        cursor = conn.cursor()
        cursor.execute(schema)
        conn.commit()
        print(f"Table {table_name} created successfully.")
    except sqlite3.Error as e:
        print(f"Error creating table {table_name}: {e}")

def save_to_parquet(df, output_path):
    """
    Save a DataFrame to a Parquet file.
    
    Args:
        df: pandas DataFrame to save
        output_path: Path to save the Parquet file
    """
    try:
        df.to_parquet(output_path, index=False)
        print(f"Saved DataFrame to {output_path}")
    except Exception as e:
        print(f"Error saving to Parquet: {e}")

def verify_sqlite_data(conn, table_name):
    """
    Verify data in a SQLite table by printing the first 5 rows.
    
    Args:
        conn: SQLite connection object
        table_name: Name of the table to query
    """
    try:
        query = f"SELECT * FROM {table_name} LIMIT 5"
        df = pd.read_sql_query(query, conn)
        print(f"Preview of {table_name}:\n{df}\n")
    except sqlite3.Error as e:
        print(f"Error verifying {table_name}: {e}")

def verify_parquet_data(file_path):
    """
    Verify data in a Parquet file by printing the first 5 rows.
    
    Args:
        file_path: Path to the Parquet file
    """
    try:
        df = pd.read_parquet(file_path)
        print(f"Preview of {file_path}:\n{df.head()}\n")
    except Exception as e:
        print(f"Error verifying {file_path}: {e}")

def validate_dates(df, date_column):
    """
    Validate and convert date column to datetime, handling invalid dates.
    
    Args:
        df: pandas DataFrame
        date_column: Name of the date column to validate
    Returns:
        DataFrame with validated date column
    """
    try:
        df[date_column] = pd.to_datetime(df[date_column], errors='coerce')
        invalid_dates = df[df[date_column].isna()]
        if not invalid_dates.empty:
            print(f"Warning: {len(invalid_dates)} rows with invalid dates in {date_column} set to NaT.")
        return df
    except Exception as e:
        print(f"Error validating dates in {date_column}: {e}")
        return df

def main():
    print("Starting ETL Load Phase for Lab 5...")

    # Section 1: Load Setup
    # Define file paths
    FULL_CSV_PATH = 'transformed_full.csv'
    INCREMENTAL_CSV_PATH = 'transformed_incremental.csv'
    OUTPUT_DIR = 'loaded_data'
    FULL_DB_PATH = os.path.join(OUTPUT_DIR, 'full_data.db')
    INCREMENTAL_DB_PATH = os.path.join(OUTPUT_DIR, 'incremental_data.db')
    FULL_PARQUET_PATH = os.path.join(OUTPUT_DIR, 'full_data.parquet')
    INCREMENTAL_PARQUET_PATH = os.path.join(OUTPUT_DIR, 'incremental_data.parquet')

    # Create output directory if it doesn't exist
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # SQLite connection setup
    full_conn = sqlite3.connect(FULL_DB_PATH)
    incremental_conn = sqlite3.connect(INCREMENTAL_DB_PATH)
    full_engine = create_engine(f'sqlite:///{FULL_DB_PATH}')
    incremental_engine = create_engine(f'sqlite:///{INCREMENTAL_DB_PATH}')

    print('Setup complete. File paths and database connections established.')

    # Section 2: Load Full Transformed Data
    FULL_TABLE_SCHEMA = '''
    CREATE TABLE IF NOT EXISTS full_data (
        id INTEGER PRIMARY KEY,
        customer_name TEXT,
        product TEXT,
        quantity INTEGER,
        unit_price REAL,
        total_price REAL,
        order_date TEXT
    )
    '''

    try:
        # Read CSV
        full_df = pd.read_csv(FULL_CSV_PATH)
        
        # Validate Order Date
        full_df = validate_dates(full_df, 'Order Date')
        
        # Map dataset columns to schema
        mapped_df = pd.DataFrame({
            'id': full_df.get('Customer ID', pd.Series(range(1, len(full_df) + 1))),
            'customer_name': pd.Series('Unknown', index=full_df.index),  # No customer_name in dataset
            'product': full_df.get('Category', pd.Series('Unknown', index=full_df.index)),
            'quantity': 1,  # Assumed as 1 per dataset structure
            'unit_price': full_df.get('Purchase Amount (USD)', 0.0),
            'total_price': full_df.get('Purchase Amount (USD)', 0.0),  # No Total_Price, use Purchase Amount
            'order_date': full_df.get('Order Date', pd.Series(None, index=full_df.index)).astype(str)
        })
        
        # Create SQLite table
        create_sqlite_table(full_conn, 'full_data', FULL_TABLE_SCHEMA)
        
        # Load to SQLite
        mapped_df.to_sql('full_data', full_engine, if_exists='replace', index=False)
        
        # Save to Parquet
        save_to_parquet(mapped_df, FULL_PARQUET_PATH)
        
        print('Full data loaded successfully into SQLite and Parquet.')
    except Exception as e:
        print(f'Error loading full data: {e}')

    # Section 3: Load Incremental Transformed Data
    INCREMENTAL_TABLE_SCHEMA = '''
    CREATE TABLE IF NOT EXISTS incremental_data (
        id INTEGER PRIMARY KEY,
        customer_name TEXT,
        product TEXT,
        quantity INTEGER,
        unit_price REAL,
        total_price REAL,
        order_date TEXT
    )
    '''

    try:
        # Read CSV
        incremental_df = pd.read_csv(INCREMENTAL_CSV_PATH)
        
        # Validate Order Date
        incremental_df = validate_dates(incremental_df, 'Order Date')
        
        # Map dataset columns to schema
        mapped_incremental_df = pd.DataFrame({
            'id': incremental_df.get('Customer ID', pd.Series(range(1, len(incremental_df) + 1))),
            'customer_name': pd.Series('Unknown', index=incremental_df.index),  # No customer_name in dataset
            'product': incremental_df.get('Category', pd.Series('Unknown', index=incremental_df.index)),
            'quantity': 1,  # Assumed as 1 per dataset structure
            'unit_price': incremental_df.get('Purchase Amount (USD)', 0.0),
            'total_price': incremental_df.get('Purchase Amount (USD)', 0.0),  # No Total_Price, use Purchase Amount
            'order_date': incremental_df.get('Order Date', pd.Series(None, index=incremental_df.index)).astype(str)
        })
        
        # Create SQLite table
        create_sqlite_table(incremental_conn, 'incremental_data', INCREMENTAL_TABLE_SCHEMA)
        
        # Load to SQLite
        mapped_incremental_df.to_sql('incremental_data', incremental_engine, if_exists='replace', index=False)
        
        # Save to Parquet
        save_to_parquet(mapped_incremental_df, INCREMENTAL_PARQUET_PATH)
        
        print('Incremental data loaded successfully into SQLite and Parquet.')
    except Exception as e:
        print(f'Error loading incremental data: {e}')

    # Section 4: Verification
    print('\nVerifying SQLite full_data table:')
    verify_sqlite_data(full_conn, 'full_data')

    print('\nVerifying SQLite incremental_data table:')
    verify_sqlite_data(incremental_conn, 'incremental_data')

    print('\nVerifying full_data.parquet:')
    verify_parquet_data(FULL_PARQUET_PATH)

    print('\nVerifying incremental_data.parquet:')
    verify_parquet_data(INCREMENTAL_PARQUET_PATH)

    # Close database connections
    full_conn.close()
    incremental_conn.close()
    print("ETL Load Phase for Lab 5 completed.")

if __name__ == "__main__":
    main()

Starting ETL Load Phase for Lab 5...
Setup complete. File paths and database connections established.
Table full_data created successfully.
Saved DataFrame to loaded_data\full_data.parquet
Full data loaded successfully into SQLite and Parquet.
Table incremental_data created successfully.
Saved DataFrame to loaded_data\incremental_data.parquet
Incremental data loaded successfully into SQLite and Parquet.

Verifying SQLite full_data table:
Preview of full_data:
   id customer_name   product  quantity  unit_price  total_price  \
0   1       Unknown  Clothing         1          53           53   
1   2       Unknown  Clothing         1          64           64   
2   3       Unknown  Clothing         1          73           73   
3   4       Unknown  Footwear         1          90           90   
4   5       Unknown  Clothing         1          49           49   

            order_date  
0  2025-04-20 12:00:00  
1  2025-04-20 12:00:00  
2  2025-04-21 12:00:00  
3  2025-04-22 12:00:00  
4 