In [27]:
# etl_load.ipynb
# ETL Load Phase - Data Storage and Verification


import pandas as pd
import sqlite3
import os
from datetime import datetime

print("ETL LOAD PHASE")
print("Initializing Data Loading Process")
print(f"Execution timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

os.makedirs('loaded', exist_ok=True)

transformed_data_path = r"C:\Users\lenovo\OneDrive\Desktop\KEndy\ASSIGNMENTS\DSA Assignments\DSA 2040\ET_EXAM_HOPE_317\transformed"

print(f"Accessing transformed data from: {transformed_data_path}")

# Load transformed datasets with comprehensive error handling
print("\n=== DATA LOADING OPERATION ===")

try:
    # Load transformed dataset
    full_data_path = os.path.join(transformed_data_path, 'transformed_full.csv')
    df_full = pd.read_csv(full_data_path)
    print(f"Successfully loaded full dataset: {df_full.shape[0]} records, {df_full.shape[1]} columns")
    
    # Load incremental transformed dataset
    incremental_data_path = os.path.join(transformed_data_path, 'transformed_incremental.csv')
    df_incremental = pd.read_csv(incremental_data_path)
    print(f"Successfully loaded incremental dataset: {df_incremental.shape[0]} records, {df_incremental.shape[1]} columns")
    
except FileNotFoundError as e:
    print(f" Required transformed data files not found at specified path")
    print(f"Error details: {e}")
    print("Please ensure the transform phase has been executed and files are available at the specified location")
    raise

except Exception as e:
    print(f"Data loading error: {e}")
    raise

# Data quality assessment before loading
print("\n=== PRE-LOAD DATA ASSESSMENT ===")

print("Dataset structure analysis:")
print(f"Full dataset dimensions: {df_full.shape}")
print(f"Incremental dataset dimensions: {df_incremental.shape}")

print("\nColumn Architecture:")
print("Full dataset columns:", list(df_full.columns))
print(f"Total derived columns created in transform phase: {len(df_full.columns)}")

print("\nData Type Distribution:")
print(df_full.dtypes.value_counts())

print("\nSample Data Validation - First 3 Records:")
print(df_full.head(3))




ETL LOAD PHASE
Initializing Data Loading Process
Execution timestamp: 2025-10-30 23:50:13
Accessing transformed data from: C:\Users\lenovo\OneDrive\Desktop\KEndy\ASSIGNMENTS\DSA Assignments\DSA 2040\ET_EXAM_HOPE_317\transformed

=== DATA LOADING OPERATION ===
Successfully loaded full dataset: 1233 records, 35 columns
Successfully loaded incremental dataset: 5000 records, 35 columns

=== PRE-LOAD DATA ASSESSMENT ===
Dataset structure analysis:
Full dataset dimensions: (1233, 35)
Incremental dataset dimensions: (5000, 35)

Column Architecture:
Full dataset columns: ['row_id', 'order_id', 'order_date', 'ship_date', 'ship_mode', 'customer_id', 'customer_name', 'segment', 'country', 'city', 'state', 'postal_code', 'region', 'product_id', 'category', 'sub_category', 'product_name', 'sales', 'sales_category', 'sales_score', 'order_year', 'order_month', 'order_quarter', 'order_day', 'order_day_of_week', 'order_weekend', 'region_state', 'is_sales_outlier', 'cust_total_sales', 'cust_avg_sale', '

In [28]:
# OPTION 1: SQLite Database Implementation
print("DATABASE IMPLEMENTATION: SQLite STORAGE")

try:
    # Initialize SQLite database connection
    database_path = 'loaded/superstore_analytics.db'
    conn = sqlite3.connect(database_path)
    print("SQLite database connection established successfully")
    print(f"Database file: {database_path}")
    
    # Load full dataset into primary table
    df_full.to_sql('superstore_sales_full', conn, if_exists='replace', index=False)
    print("Full dataset loaded into 'superstore_sales_full' table")
    
    # Load incremental dataset into separate table
    df_incremental.to_sql('superstore_sales_incremental', conn, if_exists='replace', index=False)
    print("Incremental dataset loaded into 'superstore_sales_incremental' table")
    
    # Comprehensive database verification
    print("\n--- DATABASE VERIFICATION PROTOCOL ---")
    
    # Table existence verification
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    print("Database tables verified:")
    for table in tables:
        print(f"  - {table[0]}")
    
    # Record count validation
    full_count_query = "SELECT COUNT(*) as record_count FROM superstore_sales_full"
    incremental_count_query = "SELECT COUNT(*) as record_count FROM superstore_sales_incremental"
    
    full_count = pd.read_sql(full_count_query, conn)['record_count'][0]
    incremental_count = pd.read_sql(incremental_count_query, conn)['record_count'][0]
    
    print("\nRecord Count Validation:")
    print(f"  Full table: {full_count} records (expected: {len(df_full)})")
    print(f"  Incremental table: {incremental_count} records (expected: {len(df_incremental)})")
    
    # Schema validation
    schema_query = "PRAGMA table_info(superstore_sales_full);"
    schema_info = pd.read_sql(schema_query, conn)
    print(f"\nSchema Validation: {len(schema_info)} columns confirmed in database")
    
    # Data sampling for quality assurance
    sample_query = """
    SELECT 
        order_id, 
        customer_id, 
        sales, 
        sales_category,
        customer_tier
    FROM superstore_sales_full 
    LIMIT 5
    """
    sample_data = pd.read_sql(sample_query, conn)
    print("\nData Quality Sample (Business Columns):")
    print(sample_data)
    
    # Advanced analytics verification
    analytics_query = """
    SELECT 
        COUNT(DISTINCT customer_id) as unique_customers,
        AVG(sales) as average_sales,
        COUNT(DISTINCT sales_category) as sales_categories,
        COUNT(DISTINCT customer_tier) as customer_tiers
    FROM superstore_sales_full
    """
    analytics_result = pd.read_sql(analytics_query, conn)
    print("\nAdvanced Analytics Verification:")
    print(analytics_result)
    
    conn.close()
    print("Database connection closed successfully")
    
except Exception as e:
    print(f"Database operation error: {e}")
    if 'conn' in locals():
        conn.close()


DATABASE IMPLEMENTATION: SQLite STORAGE
SQLite database connection established successfully
Database file: loaded/superstore_analytics.db
Full dataset loaded into 'superstore_sales_full' table
Incremental dataset loaded into 'superstore_sales_incremental' table

--- DATABASE VERIFICATION PROTOCOL ---
Database tables verified:
  - superstore_sales_full
  - superstore_sales_incremental

Record Count Validation:
  Full table: 1233 records (expected: 1233)
  Incremental table: 5000 records (expected: 5000)

Schema Validation: 35 columns confirmed in database

Data Quality Sample (Business Columns):
         order_id customer_id     sales sales_category customer_tier
0  CA-2017-152156    CG-12520  261.9600           High        Silver
1  CA-2017-152156    CG-12520  731.9400           High        Silver
2  CA-2017-138688    DV-13045   14.6200       Very Low        Bronze
3  US-2016-108966    SO-20335  957.5775           High        Silver
4  US-2016-108966    SO-20335   22.3680            Lo

In [29]:
# OPTION 2: Parquet Format Implementation
print("COLUMNAR STORAGE: PARQUET FORMAT")

try:
    # Saving datasets in Parquet format
    full_parquet_path = 'loaded/superstore_sales_full.parquet'
    incremental_parquet_path = 'loaded/superstore_sales_incremental.parquet'
    
    # Export to Parquet with optimization
    df_full.to_parquet(full_parquet_path, index=False, engine='pyarrow', compression='snappy')
    print(f"Full dataset exported to Parquet: {full_parquet_path}")
    
    df_incremental.to_parquet(incremental_parquet_path, index=False, engine='pyarrow', compression='snappy')
    print(f"Incremental dataset exported to Parquet: {incremental_parquet_path}")
    
    # Parquet Verification Process
    print("\n--- PARQUET VERIFICATION PROTOCOL ---")
    
    # Read back Parquet files for validation
    full_parquet_data = pd.read_parquet(full_parquet_path)
    incremental_parquet_data = pd.read_parquet(incremental_parquet_path)
    
    print("Parquet Data Integrity Check:")
    print(f"  Full dataset - Records: {len(full_parquet_data)}, Columns: {len(full_parquet_data.columns)}")
    print(f"  Incremental dataset - Records: {len(incremental_parquet_data)}, Columns: {len(incremental_parquet_data.columns)}")
    
    # Schema comparison
    print("\nSchema Consistency Verification:")
    original_columns = set(df_full.columns)
    parquet_columns = set(full_parquet_data.columns)
    schema_match = original_columns == parquet_columns
    print(f"  Schema integrity: {schema_match}")
    
    # Data sample from Parquet
    print("\nParquet Data Sample (First 3 Records):")
    print(full_parquet_data[['order_id', 'sales', 'sales_category', 'customer_tier']].head(3))
    
except Exception as e:
    print(f"Parquet operation error: {e}")



COLUMNAR STORAGE: PARQUET FORMAT
Full dataset exported to Parquet: loaded/superstore_sales_full.parquet
Incremental dataset exported to Parquet: loaded/superstore_sales_incremental.parquet

--- PARQUET VERIFICATION PROTOCOL ---
Parquet Data Integrity Check:
  Full dataset - Records: 1233, Columns: 35
  Incremental dataset - Records: 5000, Columns: 35

Schema Consistency Verification:
  Schema integrity: True

Parquet Data Sample (First 3 Records):
         order_id   sales sales_category customer_tier
0  CA-2017-152156  261.96           High        Silver
1  CA-2017-152156  731.94           High        Silver
2  CA-2017-138688   14.62       Very Low        Bronze


In [30]:
# COMPREHENSIVE CROSS-FORMAT VALIDATION
print("CROSS-FORMAT DATA VALIDATION")

try:
    # Reconnect to database for comparison
    conn = sqlite3.connect('loaded/superstore_analytics.db')
    
    # Load data from all formats for comparison
    sqlite_data = pd.read_sql('SELECT * FROM superstore_sales_full', conn)
    parquet_data = pd.read_parquet('loaded/superstore_sales_full.parquet')
    
    conn.close()
    
    print("Multi-Format Data Consistency Analysis:")
    print(f"Original transformed data: {len(df_full)} records, {len(df_full.columns)} columns")
    print(f"SQLite database data: {len(sqlite_data)} records, {len(sqlite_data.columns)} columns")
    print(f"Parquet file data: {len(parquet_data)} records, {len(parquet_data.columns)} columns")
    
    # Validation metrics
    records_match = len(df_full) == len(sqlite_data) == len(parquet_data)
    columns_match = len(df_full.columns) == len(sqlite_data.columns) == len(parquet_data.columns)
    
    print("\nValidation Results:")
    print(f"  Record count consistency: {records_match}")
    print(f"  Column structure consistency: {columns_match}")
    print(f"  Data integrity across formats: {records_match and columns_match}")
    
except Exception as e:
    print(f"Cross-format validation error: {e}")

# STORAGE EFFICIENCY ANALYSIS
print("\n STORAGE EFFICIENCY METRICS")
print("_"*60)

def analyze_storage_efficiency(file_path):
    """Analyze file size and storage efficiency"""
    if os.path.exists(file_path):
        size_bytes = os.path.getsize(file_path)
        size_mb = size_bytes / (1024 * 1024)
        return size_mb
    return 0

# Calculate file sizes
csv_size = analyze_storage_efficiency(os.path.join(transformed_data_path, 'transformed_full.csv'))
sqlite_size = analyze_storage_efficiency('loaded/superstore_analytics.db')
parquet_size = analyze_storage_efficiency('loaded/superstore_sales_full.parquet')

print("Storage Efficiency Comparison:")
print(f"  CSV format: {csv_size:.2f} MB")
print(f"  SQLite database: {sqlite_size:.2f} MB")
print(f"  Parquet format: {parquet_size:.2f} MB")

if csv_size > 0:
    sqlite_efficiency = ((csv_size - sqlite_size) / csv_size) * 100
    parquet_efficiency = ((csv_size - parquet_size) / csv_size) * 100
    print(f"\nStorage Optimization:")
    print(f"  SQLite efficiency: {sqlite_efficiency:.1f}% reduction")
    print(f"  Parquet efficiency: {parquet_efficiency:.1f}% reduction")



CROSS-FORMAT DATA VALIDATION
Multi-Format Data Consistency Analysis:
Original transformed data: 1233 records, 35 columns
SQLite database data: 1233 records, 35 columns
Parquet file data: 1233 records, 35 columns

Validation Results:
  Record count consistency: True
  Column structure consistency: True
  Data integrity across formats: True

 STORAGE EFFICIENCY METRICS
____________________________________________________________
Storage Efficiency Comparison:
  CSV format: 0.37 MB
  SQLite database: 1.91 MB
  Parquet format: 0.12 MB

Storage Optimization:
  SQLite efficiency: -413.6% reduction
  Parquet efficiency: 68.9% reduction


In [31]:
# BUSINESS READINESS ASSESSMENT
print("BUSINESS READINESS ASSESSMENT")
print("-"*60)

print("Data Availability for Business Intelligence:")
print("  SQLite Database: Ready for SQL-based analytics and reporting")
print("  Parquet Files: Optimized for big data processing and analytics")
print("  Data Integrity: Verified across all storage formats")
print("  Schema Consistency: Maintained through ETL pipeline")

print("\nAvailable Analytical Capabilities:")
print("  - Customer segmentation analysis (Bronze/Silver/Gold tiers)")
print("  - Sales performance categorization")
print("  - Temporal trend analysis")
print("  - Regional performance metrics")
print("  - Product category analytics")

# FINAL IMPLEMENTATION SUMMARY
print("LOAD PHASE IMPLEMENTATION SUMMARY")
print("_"*60)

print("SUCCESSFULLY COMPLETED OPERATIONS:")
print(" Transformed data loaded from specified directory")
print(" SQLite database created with optimized table structure")
print(" Parquet files generated with efficient compression")
print(" Comprehensive data validation across all formats")
print(" Storage efficiency analysis completed")
print(" Business readiness assessment finalized")

print(f"\nOutput Files Generated:")
print(f"  Database: loaded/superstore_analytics.db")
print(f"  Parquet Files: loaded/superstore_sales_full.parquet")
print(f"                loaded/superstore_sales_incremental.parquet")

print(f"\nTotal Records Processed: {len(df_full)}")
print(f"Total Columns Maintained: {len(df_full.columns)}")
print(f"Data Formats Supported: SQLite, Parquet")

print(f"\nLoad phase completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("ETL Pipeline Status: FULLY OPERATIONAL")

BUSINESS READINESS ASSESSMENT
------------------------------------------------------------
Data Availability for Business Intelligence:
  SQLite Database: Ready for SQL-based analytics and reporting
  Parquet Files: Optimized for big data processing and analytics
  Data Integrity: Verified across all storage formats
  Schema Consistency: Maintained through ETL pipeline

Available Analytical Capabilities:
  - Customer segmentation analysis (Bronze/Silver/Gold tiers)
  - Sales performance categorization
  - Temporal trend analysis
  - Regional performance metrics
  - Product category analytics
LOAD PHASE IMPLEMENTATION SUMMARY
____________________________________________________________
SUCCESSFULLY COMPLETED OPERATIONS:
 Transformed data loaded from specified directory
 SQLite database created with optimized table structure
 Parquet files generated with efficient compression
 Comprehensive data validation across all formats
 Storage efficiency analysis completed
 Business readiness asse