## Extract data from CSV

In [236]:
import pandas as pd
import numpy as np
from supabase import create_client
from dotenv import load_dotenv
import os
import hashlib

load_dotenv()
supabase = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY"))

In [237]:
def make_key(text):
    return int(hashlib.md5(str(text).encode()).hexdigest()[:8], 16)

In [238]:
print("📂 EXTRACT: Loading raw data...")

# Load data
data = pd.read_csv("./data/consumer_complaints.csv", low_memory=False)
print(f"📊 Loaded: {len(data):,} rows")

# Show data info
print(f"📅 Date range: {data['date_received'].min()} to {data['date_received'].max()}")
print(f"🏢 Companies: {data['company'].nunique():,}")
print(f"📋 Products: {data['product'].nunique()}")



📂 EXTRACT: Loading raw data...
📊 Loaded: 555,957 rows
📅 Date range: 01/01/2012 to 12/31/2015
🏢 Companies: 3,605
📋 Products: 11


## Transform Data

### Data Cleaning

In [239]:
def clean_data(df):
    """Clean data basics"""
    print("🧹 Cleaning data...")
    
    # Fill missing values
    df['sub_product'] = df['sub_product'].fillna('Not Specified')
    df['sub_issue'] = df['sub_issue'].fillna('General')
    df['consumer_complaint_narrative'] = df['consumer_complaint_narrative'].fillna('No Narrative')
    df['company_public_response'] = df['company_public_response'].fillna('No Response')
    df['tags'] = df['tags'].fillna('No Tag')
    df['consumer_consent_provided'] = df['consumer_consent_provided'].fillna('Not Provided')
    
    # Fix state (max 2 chars)
    df['state'] = df['state'].astype(str).str[:2].fillna('XX')
    df['zipcode'] = df['zipcode'].astype(str).str[:5].fillna('00000')
    
    # Fix column name
    if 'consumer_disputed?' in df.columns:
        df['consumer_disputed'] = df['consumer_disputed?']
        df = df.drop('consumer_disputed?', axis=1)
    
    print(f"✅ Data cleaned: {len(df)} rows")
    return df

### Data transformation

In [240]:
def process_row(row):
    """Process 1 row dan buat records untuk semua tabel"""
    
    # Generate keys
    date_rec = pd.to_datetime(row['date_received']).date()
    date_sent = pd.to_datetime(row['date_sent_to_company']).date()
    product_key = make_key(f"{row['product']}_{row['sub_product']}")
    company_key = make_key(row['company'])
    location_key = make_key(f"{row['state']}_{row['zipcode']}")
    issue_key = make_key(f"{row['issue']}_{row['sub_issue']}")
    response_key = make_key(f"{row['submitted_via']}_{row['company_response_to_consumer']}")
    
    # Date records (both dates)
    dates = []
    for date_val, date_col in [(date_rec, row['date_received']), (date_sent, row['date_sent_to_company'])]:
        dt = pd.to_datetime(date_col)
        dates.append({
            'date_key': str(date_val),
            'full_date': str(date_val),
            'year': dt.year,
            'quarter': dt.quarter,
            'month': dt.month,
            'day': dt.day,
            'day_of_week': dt.dayofweek,
            'month_name': dt.month_name(),
            'day_name': dt.day_name()
        })
    
    # Return ALL table records for this row
    return {
        'dim_date': dates,
        'dim_product': [{
            'product_key': product_key,
            'product': row['product'],
            'sub_product': row['sub_product']
        }],
        'dim_company': [{
            'company_key': company_key,
            'company': row['company']
        }],
        'dim_location': [{
            'location_key': location_key,
            'state': row['state'],
            'zipcode': row['zipcode']
        }],
        'dim_issue': [{
            'issue_key': issue_key,
            'issue': row['issue'],
            'sub_issue': row['sub_issue']
        }],
        'dim_response': [{
            'response_key': response_key,
            'submitted_via': row['submitted_via'],
            'company_response_to_consumer': row['company_response_to_consumer'],
            'consumer_consent_provided': row['consumer_consent_provided'],
            'timely_response': row['timely_response'],
            'consumer_disputed': row['consumer_disputed']
        }],
        'dim_complaint_detail': [{
            'complaint_detail_key': int(row['complaint_id']),
            'consumer_complaint_narrative': row['consumer_complaint_narrative'],
            'company_public_response': row['company_public_response'],
            'tags': row['tags']
        }],
        'fact_complaints': [{
            'complaint_detail_key': int(row['complaint_id']),
            'date_received_key': str(date_rec),
            'date_sent_key': str(date_sent),
            'product_key': product_key,
            'company_key': company_key,
            'location_key': location_key,
            'issue_key': issue_key,
            'response_key': response_key,
            'complaint_count': 1
        }]
    }

print("🔧 Helper functions ready")

🔧 Helper functions ready


In [241]:
print("\n🔧 TRANSFORM: Cleaning data...")

# Clean the data
cleaned_data = clean_data(data)

# Show missing values after cleaning
missing = cleaned_data.isnull().sum()
print(f"❓ Missing values: {missing.sum()} total")
if missing.sum() > 0:
    print(missing[missing > 0])

# Sample processed row
print("\n📋 Sample of 1 row transformation:")
sample_row = cleaned_data.iloc[0]
sample_records = process_row(sample_row)

print(f"Input: 1 row (complaint_id: {sample_row['complaint_id']})")
print("Output records to tables:")
for table, records in sample_records.items():
    print(f"  📊 {table}: {len(records)} record(s)")

print("✅ Transform ready")



🔧 TRANSFORM: Cleaning data...
🧹 Cleaning data...
✅ Data cleaned: 555957 rows
❓ Missing values: 0 total

📋 Sample of 1 row transformation:
Input: 1 row (complaint_id: 511074)
Output records to tables:
  📊 dim_date: 2 record(s)
  📊 dim_product: 1 record(s)
  📊 dim_company: 1 record(s)
  📊 dim_location: 1 record(s)
  📊 dim_issue: 1 record(s)
  📊 dim_response: 1 record(s)
  📊 dim_complaint_detail: 1 record(s)
  📊 fact_complaints: 1 record(s)
✅ Transform ready


## Load Data To Supabase Postgres

In [None]:
print("\n📤 LOAD: Processing batches to data warehouse...")

batch_size = 100
total_rows = len(cleaned_data)
batch_count = 0

# Process in batches
for i in range(0, total_rows, batch_size):
    batch_count += 1
    batch = cleaned_data.iloc[i:i+batch_size]
    
    print(f"\n📦 Batch {batch_count}: Processing rows {i+1} to {min(i+batch_size, total_rows)}")
    
    # Initialize batch containers
    all_batch_records = {
        'dim_date': [], 'dim_product': [], 'dim_company': [], 'dim_location': [],
        'dim_issue': [], 'dim_response': [], 'dim_complaint_detail': [], 'fact_complaints': []
    }
    
    # Process each row in batch
    for _, row in batch.iterrows():
        try:
            row_records = process_row(row)
            
            # Add records from this row to batch
            for table, records in row_records.items():
                all_batch_records[table].extend(records)
                
        except Exception as e:
            print(f"⚠️ Error processing row {row.name}: {e}")
            continue
    
    # Remove duplicates in dimensions
    for table in all_batch_records:
        if table == 'dim_date':
            # Remove duplicate dates
            seen = set()
            unique = []
            for record in all_batch_records[table]:
                if record['date_key'] not in seen:
                    seen.add(record['date_key'])
                    unique.append(record)
            all_batch_records[table] = unique
            
        elif table.startswith('dim_') and table != 'dim_date':
            # Remove duplicate dimensions
            key_field = f"{table.replace('dim_', '')}_key"
            seen = set()
            unique = []
            for record in all_batch_records[table]:
                key = record.get(key_field)
                if key not in seen:
                    seen.add(key)
                    unique.append(record)
            all_batch_records[table] = unique
    
    # Load to database in proper order
    load_order = ['dim_date', 'dim_product', 'dim_company', 'dim_location', 
                  'dim_issue', 'dim_response', 'dim_complaint_detail', 'fact_complaints']
    
    batch_total = 0
    for table in load_order:
        records = all_batch_records.get(table, [])
        if records:
            try:
                response = supabase.table(table).upsert(records).execute()
                print(f"  ✅ {table}: {len(records)} records")
                batch_total += len(records)
            except Exception as e:
                print(f"  ❌ {table}: {e}")
    
    print(f"  📊 Batch total: {batch_total} records loaded")
    progress = min(i + batch_size, total_rows) / total_rows * 100
    print(f"  📈 Progress: {progress:.1f}%")

print(f"\n🎉 LOAD completed! Processed {batch_count} batches")



📤 LOAD: Processing batches to data warehouse...

📦 Batch 1: Processing rows 1 to 100
  ✅ dim_date: 17 records
  ✅ dim_product: 20 records
  ✅ dim_company: 45 records
  ✅ dim_location: 94 records
  ✅ dim_issue: 41 records
  ✅ dim_response: 12 records
  ✅ dim_complaint_detail: 100 records
  ✅ fact_complaints: 100 records
  📊 Batch total: 429 records loaded
  📈 Progress: 0.0%

📦 Batch 2: Processing rows 101 to 200
  ✅ dim_date: 16 records
  ✅ dim_product: 18 records
  ✅ dim_company: 44 records
  ✅ dim_location: 97 records
  ✅ dim_issue: 35 records
  ✅ dim_response: 14 records
  ✅ dim_complaint_detail: 100 records
  ✅ fact_complaints: 100 records
  📊 Batch total: 424 records loaded
  📈 Progress: 0.0%

📦 Batch 3: Processing rows 201 to 300
  ✅ dim_date: 18 records
  ✅ dim_product: 19 records
  ✅ dim_company: 37 records
  ✅ dim_location: 96 records
  ✅ dim_issue: 38 records
  ✅ dim_response: 15 records
  ✅ dim_complaint_detail: 100 records
  ✅ fact_complaints: 100 records
  📊 Batch total: 4

In [None]:
print("\n🔍 VERIFY: Checking data warehouse...")

# Check record counts
tables = ['dim_date', 'dim_product', 'dim_company', 'dim_location', 
          'dim_issue', 'dim_response', 'dim_complaint_detail', 'fact_complaints']

print("📊 Final record counts:")
total_records = 0

for table in tables:
    try:
        count = supabase.table(table).select("*", count="exact").execute().count
        print(f"  📋 {table:<25}: {count:>6,} records")
        total_records += count
    except Exception as e:
        print(f"  ❌ {table:<25}: Error - {e}")

print(f"\n🎯 Total records in DW: {total_records:,}")
print(f"📊 From {len(cleaned_data):,} input rows")
print(f"💡 Ratio: {total_records/len(cleaned_data):.1f}x (1 input row → multiple DW records)")

# Sample verification - check if data is properly distributed
print("\n🔍 Sample verification:")
try:
    # Get sample fact record
    fact_sample = supabase.table('fact_complaints').select('*').limit(1).execute()
    if fact_sample.data:
        fact_record = fact_sample.data[0]
        print(f"✅ Sample fact record ID: {fact_record['complaint_detail_key']}")
        
        # Check if related dimension records exist
        checks = [
            ('dim_complaint_detail', 'complaint_detail_key', fact_record['complaint_detail_key']),
            ('dim_date', 'date_key', fact_record['date_received_key']),
            ('dim_product', 'product_key', fact_record['product_key']),
            ('dim_company', 'company_key', fact_record['company_key'])
        ]
        
        for table, key_field, key_value in checks:
            check = supabase.table(table).select('*').eq(key_field, key_value).execute()
            if check.data:
                print(f"✅ Related {table} record exists")
            else:
                print(f"❌ Missing {table} record for key {key_value}")
    
except Exception as e:
    print(f"⚠️ Verification error: {e}")

print("\n🎉 ETL COMPLETED SUCCESSFULLY!")
print("💡 Your star schema data warehouse is ready for analysis!")
print("🚀 Next: Connect your BI tool to Supabase and start analyzing!")


🔍 VERIFY: Checking data warehouse...
📊 Final record counts:
  📋 dim_date                 :     49 records
  📋 dim_product              :     28 records
  📋 dim_company              :    202 records
  📋 dim_location             :    853 records
  📋 dim_issue                :     85 records
  📋 dim_response             :     21 records
  📋 dim_complaint_detail     :  1,000 records
  📋 fact_complaints          :  1,000 records

🎯 Total records in DW: 3,238
📊 From 1,000 input rows
💡 Ratio: 3.2x (1 input row → multiple DW records)

🔍 Sample verification:
✅ Sample fact record ID: 511074
✅ Related dim_complaint_detail record exists
✅ Related dim_date record exists
✅ Related dim_product record exists
✅ Related dim_company record exists

🎉 ETL COMPLETED SUCCESSFULLY!
💡 Your star schema data warehouse is ready for analysis!
🚀 Next: Connect your BI tool to Supabase and start analyzing!
