# SQL Data Loading - Star Schema for Business Intelligence

This notebook loads cleaned data into a SQL database using a **Star Schema** dimensional model optimized for BI and analytics.

## Architecture:
```
Cleaned CSVs ‚Üí Star Schema (SQLite)
                  ‚îú‚îÄ‚îÄ Fact: fact_sales
                  ‚îî‚îÄ‚îÄ Dimensions:
                      ‚îú‚îÄ‚îÄ dim_customer
                      ‚îú‚îÄ‚îÄ dim_product
                      ‚îú‚îÄ‚îÄ dim_date
                      ‚îú‚îÄ‚îÄ dim_geography
                      ‚îî‚îÄ‚îÄ dim_order
```

# 01. Setup & Database Connection

In [16]:
# 01. Setup & Libraries
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from datetime import datetime
import os
import warnings
from dotenv import load_dotenv
from pathlib import Path

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Pandas display configuration
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.float_format', '{:.2f}'.format)

# Load environment variables from project root
# Find the .env file in the parent directory (project root)
env_path = Path('..') / '.env'

# IMPORTANT: override=True forces reload even if variables already exist
load_dotenv(dotenv_path=env_path, override=True)

print(f"Loading .env from: {env_path.absolute()}")
print(f"File exists: {env_path.exists()}")

# Supabase Database Configuration
DATABASE_URL = os.getenv('DATABASE_URL')

# Alternative: Build connection string from individual components
if not DATABASE_URL:
    SUPABASE_HOST = os.getenv('SUPABASE_HOST')
    SUPABASE_PORT = os.getenv('SUPABASE_PORT', '5432')
    SUPABASE_DATABASE = os.getenv('SUPABASE_DATABASE', 'postgres')
    SUPABASE_USER = os.getenv('SUPABASE_USER')
    SUPABASE_PASSWORD = os.getenv('SUPABASE_PASSWORD')
    
    # Debug: Check if variables are loaded
    print(f"\nLoaded credentials:")
    print(f"Host: {SUPABASE_HOST}")
    print(f"User: {SUPABASE_USER}")
    print(f"Port: {SUPABASE_PORT}")
    print(f"Password: {'*' * len(SUPABASE_PASSWORD) if SUPABASE_PASSWORD else 'NOT SET'}")
    
    if not all([SUPABASE_HOST, SUPABASE_USER, SUPABASE_PASSWORD]):
        raise ValueError("‚ùå Missing Supabase credentials in .env file!")
    
    DATABASE_URL = f"postgresql://{SUPABASE_USER}:{SUPABASE_PASSWORD}@{SUPABASE_HOST}:{SUPABASE_PORT}/{SUPABASE_DATABASE}"

# Create SQLAlchemy engine
engine = create_engine(DATABASE_URL)

# Test connection
print("\nTesting connection...")
try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version();"))
        version = result.fetchone()[0]
        print("‚úÖ Connected to Supabase PostgreSQL!")
        print(f"Database version: {version[:80]}...")
except Exception as e:
    print(f"‚ùå Connection failed: {e}")
    print("\nüí° Troubleshooting:")
    print("   1. Verify your password is correct in .env")
    print("   2. Check Supabase dashboard for connection string")
    print("   3. Ensure you're using Session Pooler (port 6543)")
    print("   4. Try restarting the Jupyter kernel")

Loading .env from: /Users/diegoferra/Documents/Python codes/bloque_clase/notebooks/../.env
File exists: True

Loaded credentials:
Host: aws-1-us-east-1.pooler.supabase.com
User: postgres.ypznufmiuekmrtdjmcux
Port: 6543
Password: *********

Testing connection...
‚úÖ Connected to Supabase PostgreSQL!
Database version: PostgreSQL 17.6 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 13.2.0, 64-b...


In [17]:
# Load cleaned datasets with low_memory=False to avoid dtype warnings
print("\nüìÇ Loading cleaned datasets from data/processed/\n")

customerAddress = pd.read_csv('../data/processed/clean_CustomerAddress.csv', low_memory=False)
individualCustomer = pd.read_csv('../data/processed/clean_IndividualCustomer.csv', low_memory=False)
productCatalog = pd.read_csv('../data/processed/clean_ProductCatalog.csv', low_memory=False)
ordersList = pd.read_csv('../data/processed/clean_OrdersList.csv', low_memory=False)
generalOrder = pd.read_csv('../data/processed/clean_GeneralOrderDetail.csv', low_memory=False)
productOrderDetail = pd.read_csv('../data/processed/clean_ProductOrderDetail.csv', low_memory=False)

# Verify datasets loaded successfully
datasets = {
    'Customer Address': customerAddress,
    'Individual Customer': individualCustomer,
    'Product Catalog': productCatalog,
    'Orders List': ordersList,
    'General Order': generalOrder,
    'Product Order Detail': productOrderDetail
}

print("üìä DATASETS LOADED SUCCESSFULLY")
print("=" * 70)
for name, df in datasets.items():
    memory_mb = df.memory_usage(deep=True).sum() / 1024**2
    print(f"{name:25s}: {len(df):>7,} rows √ó {len(df.columns):>3} cols | {memory_mb:>6.2f} MB")
print("=" * 70)

# Calculate total statistics
total_rows = sum(len(df) for df in datasets.values())
total_memory = sum(df.memory_usage(deep=True).sum() / 1024**2 for df in datasets.values())
print(f"{'TOTAL':25s}: {total_rows:>7,} rows             | {total_memory:>6.2f} MB")
print("=" * 70)
print("\n‚úÖ All datasets ready for transformation into Star Schema!")


üìÇ Loading cleaned datasets from data/processed/

üìä DATASETS LOADED SUCCESSFULLY
Customer Address         : 221,437 rows √ó  26 cols | 361.24 MB
Individual Customer      : 178,494 rows √ó  52 cols | 386.59 MB
Product Catalog          :   7,158 rows √ó   6 cols |   1.95 MB
Orders List              :  67,831 rows √ó  38 cols | 170.59 MB
General Order            :  59,310 rows √ó  43 cols | 105.25 MB
Product Order Detail     :  87,609 rows √ó 108 cols | 256.62 MB
TOTAL                    : 621,839 rows             | 1282.23 MB

‚úÖ All datasets ready for transformation into Star Schema!


# 02. Star Schema Design

## Dimensional Model Overview

The star schema consists of one **fact table** surrounded by **dimension tables**. This design optimizes query performance for analytical workloads and BI dashboards.

## üìä Fact Table: `fact_sales`

**Grain:** One row per product item in an order

**Purpose:** Stores transactional sales data with foreign keys to dimensions

### Schema:

| Column | Type | Description |
|--------|------|-------------|
| `sale_id` | INTEGER PRIMARY KEY | Surrogate key (auto-increment) |
| `order_id` | VARCHAR(50) | Business key - Order identifier |
| `customer_key` | INTEGER FK | ‚Üí dim_customer |
| `product_key` | INTEGER FK | ‚Üí dim_product |
| `date_key` | INTEGER FK | ‚Üí dim_date (YYYYMMDD format) |
| `geography_key` | INTEGER FK | ‚Üí dim_geography |
| `order_key` | INTEGER FK | ‚Üí dim_order |
| **Measures (Metrics):** | | |
| `quantity` | INTEGER | Units sold |
| `unit_price` | DECIMAL(10,2) | Price per unit |
| `list_price` | DECIMAL(10,2) | Original list price |
| `selling_price` | DECIMAL(10,2) | Final selling price |
| `discount_amount` | DECIMAL(10,2) | Discount applied |
| `shipping_price` | DECIMAL(10,2) | Shipping cost |
| `total_amount` | DECIMAL(10,2) | Total transaction value |
| `is_gift` | BOOLEAN | Gift flag |

**Source Tables:** `productOrderDetail` (primary), `ordersList`, `generalOrder`

## üë§ Dimension: `dim_customer`

**Purpose:** Customer profile and demographic information

### Schema:

| Column | Type | Description |
|--------|------|-------------|
| `customer_key` | INTEGER PRIMARY KEY | Surrogate key |
| `user_id` | VARCHAR(50) UNIQUE | Business key |
| `birth_date` | DATE | Date of birth |
| `customer_age` | INTEGER | Calculated age |
| `gender` | VARCHAR(10) | Customer gender |
| `email` | VARCHAR(255) | Email address |
| `phone` | VARCHAR(50) | Phone number |
| `first_purchase_date` | DATE | Date of first purchase |
| `last_session_date` | DATETIME | Last platform activity |
| `is_active` | BOOLEAN | Active customer flag |
| `created_at` | DATETIME | Record creation timestamp |

**Source Table:** `individualCustomer`

**SCD Type:** Type 1 (overwrite) - For this project, we assume customer data doesn't need historical tracking

## üõí Dimension: `dim_product`

**Purpose:** Product catalog and hierarchy information

### Schema:

| Column | Type | Description |
|--------|------|-------------|
| `product_key` | INTEGER PRIMARY KEY | Surrogate key |
| `product_id` | VARCHAR(50) UNIQUE | Business key (IdMaterial) |
| `product_name` | VARCHAR(255) | Product material name |
| `ean_upc` | VARCHAR(50) | Barcode |
| `brand` | VARCHAR(100) | Product brand |
| `category` | VARCHAR(100) | Product category |
| `segment` | VARCHAR(100) | Product segment |
| `is_active` | BOOLEAN | Active in catalog |

**Source Table:** `productCatalog`

## üìÖ Dimension: `dim_date`

**Purpose:** Time intelligence for temporal analysis

### Schema:

| Column | Type | Description |
|--------|------|-------------|
| `date_key` | INTEGER PRIMARY KEY | YYYYMMDD format (e.g., 20210115) |
| `full_date` | DATE UNIQUE | Actual date |
| `year` | INTEGER | Year (2021, 2022) |
| `quarter` | INTEGER | Quarter (1-4) |
| `month` | INTEGER | Month (1-12) |
| `month_name` | VARCHAR(20) | Month name (January, etc.) |
| `week_of_year` | INTEGER | ISO week number |
| `day_of_month` | INTEGER | Day (1-31) |
| `day_of_week` | INTEGER | Weekday (1=Monday, 7=Sunday) |
| `day_name` | VARCHAR(20) | Day name (Monday, etc.) |
| `is_weekend` | BOOLEAN | Weekend flag |
| `is_holiday` | BOOLEAN | Holiday flag (optional) |
| `quarter_name` | VARCHAR(10) | Q1, Q2, Q3, Q4 |
| `year_month` | VARCHAR(10) | YYYY-MM format |

**Source:** Generated programmatically from date range in data (Jan 2021 - Nov 2022)

**Note:** This is a conformed dimension - same date dimension used across all facts

## üìç Dimension: `dim_geography`

**Purpose:** Location and address information for geographic analysis

### Schema:

| Column | Type | Description |
|--------|------|-------------|
| `geography_key` | INTEGER PRIMARY KEY | Surrogate key |
| `address_id` | VARCHAR(50) | Business key |
| `user_id` | VARCHAR(50) | Associated customer |
| `country` | VARCHAR(100) | Country name |
| `state` | VARCHAR(100) | State/province |
| `city` | VARCHAR(100) | City name |
| `neighborhood` | VARCHAR(100) | Neighborhood |
| `postal_code` | VARCHAR(20) | ZIP/postal code |
| `street` | VARCHAR(255) | Street address |
| `latitude` | DECIMAL(10,8) | Geographic coordinate |
| `longitude` | DECIMAL(11,8) | Geographic coordinate |
| `address_type` | VARCHAR(50) | Residential, commercial, etc. |
| `is_default` | BOOLEAN | Default address flag |

**Source Table:** `customerAddress`

## üì¶ Dimension: `dim_order`

**Purpose:** Order-level attributes and status information

### Schema:

| Column | Type | Description |
|--------|------|-------------|
| `order_key` | INTEGER PRIMARY KEY | Surrogate key |
| `order_id` | VARCHAR(50) UNIQUE | Business key |
| `creation_date` | DATETIME | Order creation timestamp |
| `authorized_date` | DATETIME | Payment authorization |
| `invoiced_date` | DATETIME | Invoice date |
| `order_status` | VARCHAR(50) | Current status |
| `payment_method` | VARCHAR(50) | Payment type |
| `shipping_estimated_date` | DATE | Estimated delivery |
| `shipping_estimated_min` | DATE | Min delivery estimate |
| `shipping_estimated_max` | DATE | Max delivery estimate |
| `days_to_shipping` | INTEGER | Days from order to ship |
| `order_year` | INTEGER | Order year |
| `order_month` | INTEGER | Order month |
| `order_quarter` | INTEGER | Order quarter |
| `order_day_of_week` | INTEGER | Order weekday |
| `channel` | VARCHAR(50) | Sales channel |
| `seller_id` | VARCHAR(50) | Seller identifier |

**Source Tables:** `ordersList`, `generalOrder`

## üîó Relationships & Cardinality

```
dim_customer (1) ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ (*) fact_sales
dim_product (1)  ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ (*) fact_sales
dim_date (1)     ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ (*) fact_sales
dim_geography (1)‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ (*) fact_sales
dim_order (1)    ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ (*) fact_sales
```

**Key Points:**
- All relationships are **1:Many** (dimension ‚Üí fact)
- Fact table contains **only foreign keys + measures**
- Dimensions are **denormalized** for query performance
- Date dimension is **pre-populated** with all dates in range

## üìù Design Decisions & Notes

### 1. Grain Selection
- **Fact grain:** Product line item per order (most atomic level)
- Allows aggregation to any level: order, customer, product, day, etc.

### 2. Surrogate Keys
- All dimensions use auto-increment surrogate keys
- Business keys (userId, orderId, productId) preserved for reference
- Simplifies joins and improves performance

### 3. Slowly Changing Dimensions (SCD)
- **Type 1 (Overwrite)** for all dimensions
- No historical tracking needed for this project
- Future enhancement: Type 2 for customer/product changes

### 4. Degenerate Dimensions
- `order_id` stored in fact table (not just FK)
- Allows grouping by order without joining dim_order

### 5. Conformed Dimensions
- `dim_date` is a conformed dimension
- Can be reused across multiple fact tables if schema expands

### 6. Missing Data Handling
- Unknown/missing dimension values ‚Üí special record with key = -1
- Example: Unknown customer, Unknown product, etc.

### 7. Data Types
- Decimals for monetary values (avoid floating point errors)
- VARCHAR with appropriate lengths
- DATE/DATETIME for temporal columns
- BOOLEAN for flags

## üéØ Business Metrics Enabled by This Model

This star schema design enables analysis of:

**Sales Performance:**
- Total revenue by period/product/customer
- Average order value
- Discount effectiveness
- Shipping cost analysis

**Customer Analytics:**
- Customer lifetime value (CLV)
- Customer segmentation by age/geography
- Repeat purchase rate
- Customer acquisition trends

**Product Analytics:**
- Top products by revenue/quantity
- Category performance
- Brand comparison
- Product mix analysis

**Geographic Analytics:**
- Sales by country/state/city
- Regional performance
- Market penetration

**Temporal Analytics:**
- Seasonality patterns
- Year-over-year growth
- Weekend vs weekday sales
- Monthly/quarterly trends

**Operational Metrics:**
- Fulfillment time (days to shipping)
- Order status distribution
- Payment method preferences

# 03. Create SQL Tables (DDL)

In [None]:
print("üîß Creating Star Schema Tables in PostgreSQL\n")

# DDL Statements for Star Schema
ddl_statements = []

# ==============================================================================
# 1. DROP EXISTING TABLES (if any) - in reverse order due to FK constraints
# ==============================================================================
drop_tables = """
DROP TABLE IF EXISTS fact_sales CASCADE;
DROP TABLE IF EXISTS dim_customer CASCADE;
DROP TABLE IF EXISTS dim_product CASCADE;
DROP TABLE IF EXISTS dim_date CASCADE;
DROP TABLE IF EXISTS dim_geography CASCADE;
DROP TABLE IF EXISTS dim_order CASCADE;
"""

ddl_statements.append(("Drop existing tables", drop_tables))

# ==============================================================================
# 2. CREATE DIMENSION TABLES
# ==============================================================================

# --- dim_customer ---
create_dim_customer = """
CREATE TABLE dim_customer (
    customer_key SERIAL PRIMARY KEY,
    user_id VARCHAR(50) UNIQUE,
    birth_date DATE,
    customer_age INTEGER,
    gender VARCHAR(10),
    email VARCHAR(255),
    phone VARCHAR(50),
    first_purchase_date DATE,
    last_session_date TIMESTAMP,
    is_active BOOLEAN,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_customer_user_id ON dim_customer(user_id);
CREATE INDEX idx_customer_age ON dim_customer(customer_age);
"""

ddl_statements.append(("Create dim_customer", create_dim_customer))

# --- dim_product ---
create_dim_product = """
CREATE TABLE dim_product (
    product_key SERIAL PRIMARY KEY,
    product_id VARCHAR(50) UNIQUE,
    product_name VARCHAR(255),
    ean_upc VARCHAR(50),
    brand VARCHAR(100),
    category VARCHAR(100),
    segment VARCHAR(100),
    is_active BOOLEAN DEFAULT TRUE
);

CREATE INDEX idx_product_id ON dim_product(product_id);
CREATE INDEX idx_product_brand ON dim_product(brand);
CREATE INDEX idx_product_category ON dim_product(category);
"""

ddl_statements.append(("Create dim_product", create_dim_product))

# --- dim_date ---
create_dim_date = """
CREATE TABLE dim_date (
    date_key INTEGER PRIMARY KEY,
    full_date DATE UNIQUE NOT NULL,
    year INTEGER,
    quarter INTEGER,
    month INTEGER,
    month_name VARCHAR(20),
    week_of_year INTEGER,
    day_of_month INTEGER,
    day_of_week INTEGER,
    day_name VARCHAR(20),
    is_weekend BOOLEAN,
    is_holiday BOOLEAN DEFAULT FALSE,
    quarter_name VARCHAR(10),
    year_month VARCHAR(10)
);

CREATE INDEX idx_date_full_date ON dim_date(full_date);
CREATE INDEX idx_date_year_month ON dim_date(year, month);
"""

ddl_statements.append(("Create dim_date", create_dim_date))

# --- dim_geography ---
create_dim_geography = """
CREATE TABLE dim_geography (
    geography_key SERIAL PRIMARY KEY,
    address_id VARCHAR(50),
    user_id VARCHAR(50),
    country VARCHAR(100),
    state VARCHAR(100),
    city VARCHAR(100),
    neighborhood VARCHAR(100),
    postal_code VARCHAR(20),
    street VARCHAR(255),
    latitude DECIMAL(12,8),
    longitude DECIMAL(12,8),
    address_type VARCHAR(50),
    is_default BOOLEAN
);

CREATE INDEX idx_geography_user_id ON dim_geography(user_id);
CREATE INDEX idx_geography_country ON dim_geography(country);
CREATE INDEX idx_geography_city ON dim_geography(city);
"""

ddl_statements.append(("Create dim_geography", create_dim_geography))

# --- dim_order ---
create_dim_order = """
CREATE TABLE dim_order (
    order_key SERIAL PRIMARY KEY,
    order_id VARCHAR(50) UNIQUE,
    creation_date TIMESTAMP,
    authorized_date TIMESTAMP,
    invoiced_date TIMESTAMP,
    order_status VARCHAR(50),
    payment_method VARCHAR(50),
    shipping_estimated_date DATE,
    shipping_estimated_min DATE,
    shipping_estimated_max DATE,
    days_to_shipping INTEGER,
    order_year INTEGER,
    order_month INTEGER,
    order_quarter INTEGER,
    order_day_of_week INTEGER,
    channel VARCHAR(50),
    seller_id VARCHAR(50)
);

CREATE INDEX idx_order_id ON dim_order(order_id);
CREATE INDEX idx_order_creation_date ON dim_order(creation_date);
CREATE INDEX idx_order_status ON dim_order(order_status);
"""

ddl_statements.append(("Create dim_order", create_dim_order))

# ==============================================================================
# 3. CREATE FACT TABLE WITH INCREASED DECIMAL PRECISION
# ==============================================================================

create_fact_sales = """
CREATE TABLE fact_sales (
    sale_id SERIAL PRIMARY KEY,
    order_id VARCHAR(50),
    customer_key INTEGER REFERENCES dim_customer(customer_key),
    product_key INTEGER REFERENCES dim_product(product_key),
    date_key INTEGER REFERENCES dim_date(date_key),
    geography_key INTEGER REFERENCES dim_geography(geography_key),
    order_key INTEGER REFERENCES dim_order(order_key),
    -- Measures (DECIMAL(15,2) to handle large price values)
    quantity INTEGER,
    unit_price DECIMAL(15,2),
    list_price DECIMAL(15,2),
    selling_price DECIMAL(15,2),
    discount_amount DECIMAL(15,2),
    shipping_price DECIMAL(15,2),
    total_amount DECIMAL(15,2),
    is_gift BOOLEAN
);

-- Create indexes on foreign keys for join performance
CREATE INDEX idx_fact_customer_key ON fact_sales(customer_key);
CREATE INDEX idx_fact_product_key ON fact_sales(product_key);
CREATE INDEX idx_fact_date_key ON fact_sales(date_key);
CREATE INDEX idx_fact_geography_key ON fact_sales(geography_key);
CREATE INDEX idx_fact_order_key ON fact_sales(order_key);
CREATE INDEX idx_fact_order_id ON fact_sales(order_id);
"""

ddl_statements.append(("Create fact_sales", create_fact_sales))

# ==============================================================================
# 4. EXECUTE ALL DDL STATEMENTS
# ==============================================================================

print("Executing DDL statements...\n")

try:
    with engine.connect() as conn:
        for description, sql in ddl_statements:
            print(f"  ‚öôÔ∏è  {description}...")
            conn.execute(text(sql))
            conn.commit()
            print(f"     ‚úÖ {description} - SUCCESS")
        
    print("\n" + "=" * 70)
    print("üéâ Star Schema created successfully in PostgreSQL!")
    print("=" * 70)
    
    # Verify tables were created
    with engine.connect() as conn:
        result = conn.execute(text("""
            SELECT table_name 
            FROM information_schema.tables 
            WHERE table_schema = 'public' 
            AND table_type = 'BASE TABLE'
            ORDER BY table_name;
        """))
        tables = [row[0] for row in result]
        
        print("\nüìã Tables created in database:")
        for table in tables:
            print(f"   ‚Ä¢ {table}")
             
except Exception as e:
    print(f"\n‚ùå Error creating tables: {e}")
    raise

üîß Creating Star Schema Tables in PostgreSQL

Executing DDL statements...

  ‚öôÔ∏è  Drop existing tables...
     ‚úÖ Drop existing tables - SUCCESS
  ‚öôÔ∏è  Create dim_customer...
     ‚úÖ Create dim_customer - SUCCESS
  ‚öôÔ∏è  Create dim_product...
     ‚úÖ Create dim_product - SUCCESS
  ‚öôÔ∏è  Create dim_date...
     ‚úÖ Create dim_date - SUCCESS
  ‚öôÔ∏è  Create dim_geography...
     ‚úÖ Create dim_geography - SUCCESS
  ‚öôÔ∏è  Create dim_order...
     ‚úÖ Create dim_order - SUCCESS
  ‚öôÔ∏è  Create fact_sales...
     ‚úÖ Create fact_sales - SUCCESS

üéâ Star Schema created successfully in PostgreSQL!

üìã Tables created in database:
   ‚Ä¢ dim_customer
   ‚Ä¢ dim_date
   ‚Ä¢ dim_geography
   ‚Ä¢ dim_order
   ‚Ä¢ dim_product
   ‚Ä¢ fact_sales


# 04. Sql data load

In [19]:
print("\nüîß Inserting 'Unknown' records for missing data handling\n")

# Insert special "Unknown" records with fixed IDs for each dimension
# These will be used when foreign key lookups fail (missing data)

unknown_inserts = []

# Unknown Customer (customer_key will be = 1)
unknown_customer = """
INSERT INTO dim_customer (user_id, birth_date, customer_age, gender, email, phone, is_active)
VALUES ('UNKNOWN', NULL, NULL, 'Unknown', 'unknown@unknown.com', 'N/A', FALSE);
"""
unknown_inserts.append(("Unknown Customer", unknown_customer))

# Unknown Product (product_key will be = 1)
unknown_product = """
INSERT INTO dim_product (product_id, product_name, ean_upc, brand, category, segment, is_active)
VALUES ('UNKNOWN', 'Unknown Product', 'N/A', 'Unknown', 'Unknown', 'Unknown', FALSE);
"""
unknown_inserts.append(("Unknown Product", unknown_product))

# Unknown Date (date_key = 19000101 = January 1, 1900)
unknown_date = """
INSERT INTO dim_date (date_key, full_date, year, quarter, month, month_name, week_of_year, 
                      day_of_month, day_of_week, day_name, is_weekend, quarter_name, year_month)
VALUES (19000101, '1900-01-01', 1900, 1, 1, 'January', 1, 1, 1, 'Monday', FALSE, 'Q1', '1900-01');
"""
unknown_inserts.append(("Unknown Date", unknown_date))

# Unknown Geography (geography_key will be = 1)
unknown_geography = """
INSERT INTO dim_geography (address_id, user_id, country, state, city, neighborhood, 
                           postal_code, street, address_type, is_default)
VALUES ('UNKNOWN', 'UNKNOWN', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 
        'N/A', 'Unknown', 'Unknown', FALSE);
"""
unknown_inserts.append(("Unknown Geography", unknown_geography))

# Unknown Order (order_key will be = 1)
unknown_order = """
INSERT INTO dim_order (order_id, order_status, payment_method, channel)
VALUES ('UNKNOWN', 'Unknown', 'Unknown', 'Unknown');
"""
unknown_inserts.append(("Unknown Order", unknown_order))

# Execute inserts
try:
    with engine.connect() as conn:
        for description, sql in unknown_inserts:
            print(f"  ‚öôÔ∏è  Inserting {description}...")
            conn.execute(text(sql))
            conn.commit()
            print(f"     ‚úÖ {description} inserted")
            
    print("\n" + "=" * 70)
    print("‚úÖ 'Unknown' records inserted successfully!")
    print("=" * 70)
    print("\nThese records will be used for NULL foreign key references")
    
except Exception as e:
    print(f"\n‚ùå Error inserting unknown records: {e}")
    raise


üîß Inserting 'Unknown' records for missing data handling

  ‚öôÔ∏è  Inserting Unknown Customer...
     ‚úÖ Unknown Customer inserted
  ‚öôÔ∏è  Inserting Unknown Product...
     ‚úÖ Unknown Product inserted
  ‚öôÔ∏è  Inserting Unknown Date...
     ‚úÖ Unknown Date inserted
  ‚öôÔ∏è  Inserting Unknown Geography...
     ‚úÖ Unknown Geography inserted
  ‚öôÔ∏è  Inserting Unknown Order...
     ‚úÖ Unknown Order inserted

‚úÖ 'Unknown' records inserted successfully!

These records will be used for NULL foreign key references


In [20]:
print("üìä Loading Data into Star Schema\n")
print("=" * 70)
print("Loading order:")
print("  1. dim_date (generated)")
print("  2. dim_customer")
print("  3. dim_product")
print("  4. dim_geography")
print("  5. dim_order")
print("  6. fact_sales (with lookups)")
print("=" * 70)

üìä Loading Data into Star Schema

Loading order:
  1. dim_date (generated)
  2. dim_customer
  3. dim_product
  4. dim_geography
  5. dim_order
  6. fact_sales (with lookups)


## Dim date

In [21]:
# ==============================================================================
# 1. LOAD dim_date - Generate date dimension programmatically
# ==============================================================================

print("\n‚è∞ Generating dim_date dimension...\n")

# Find date range from the data
date_columns_to_check = [
    ordersList['creationDate'],
    generalOrder['creationDate']
]

# Get min and max dates
all_dates = pd.concat(date_columns_to_check, ignore_index=True)
all_dates = pd.to_datetime(all_dates, errors='coerce')
all_dates = all_dates.dropna()

min_date = all_dates.min()
max_date = all_dates.max()

print(f"Data date range: {min_date.date()} to {max_date.date()}")

# Generate full date range (extend a bit for safety)
start_date = min_date - pd.DateOffset(days=30)  # 30 days before
end_date = max_date + pd.DateOffset(days=30)    # 30 days after

date_range = pd.date_range(start=start_date, end=end_date, freq='D')

print(f"Generating {len(date_range)} dates from {date_range[0].date()} to {date_range[-1].date()}\n")

# Build dim_date DataFrame
dim_date_data = []
for date in date_range:
    dim_date_data.append({
        'date_key': int(date.strftime('%Y%m%d')),
        'full_date': date.date(),
        'year': date.year,
        'quarter': date.quarter,
        'month': date.month,
        'month_name': date.strftime('%B'),
        'week_of_year': date.isocalendar()[1],
        'day_of_month': date.day,
        'day_of_week': date.dayofweek + 1,  # 1=Monday, 7=Sunday
        'day_name': date.strftime('%A'),
        'is_weekend': date.dayofweek >= 5,  # Saturday=5, Sunday=6
        'is_holiday': False,  # Could be enhanced with holiday calendar
        'quarter_name': f'Q{date.quarter}',
        'year_month': date.strftime('%Y-%m')
    })

dim_date_df = pd.DataFrame(dim_date_data)

# Load to database
print(f"Loading {len(dim_date_df)} records to dim_date...")
dim_date_df.to_sql('dim_date', engine, if_exists='append', index=False, method='multi', chunksize=1000)
print(f"‚úÖ dim_date loaded: {len(dim_date_df):,} records")

# Verify
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM dim_date;"))
    count = result.fetchone()[0]
    print(f"   Verified in database: {count:,} records")


‚è∞ Generating dim_date dimension...

Data date range: 2021-01-01 to 2022-11-03
Generating 732 dates from 2020-12-02 to 2022-12-03

Loading 732 records to dim_date...
‚úÖ dim_date loaded: 732 records
   Verified in database: 733 records


## Dim Customer

In [22]:
# ==============================================================================
# 2. LOAD dim_customer
# ==============================================================================

print("\nüë§ Loading dim_customer dimension...\n")

# Prepare customer dimension from individualCustomer
dim_customer_prep = individualCustomer[['userId', 'birthDate', 'customer_age', 'gender', 
                                         'email', 'phone', 'rclastsessiondate']].copy()

# Rename columns to match dimension schema
dim_customer_prep.rename(columns={
    'userId': 'user_id',
    'birthDate': 'birth_date',
    'rclastsessiondate': 'last_session_date'
}, inplace=True)

# Convert dates to proper format
dim_customer_prep['birth_date'] = pd.to_datetime(dim_customer_prep['birth_date'], errors='coerce')
dim_customer_prep['last_session_date'] = pd.to_datetime(dim_customer_prep['last_session_date'], errors='coerce')

# Add business logic columns
dim_customer_prep['is_active'] = dim_customer_prep['last_session_date'].notna()
dim_customer_prep['first_purchase_date'] = None  # Could be calculated from orders

# Remove duplicates on user_id
dim_customer_prep = dim_customer_prep.drop_duplicates(subset=['user_id'], keep='first')

# Handle NaN values
dim_customer_prep['gender'] = dim_customer_prep['gender'].fillna('Unknown')
dim_customer_prep['email'] = dim_customer_prep['email'].fillna('unknown@unknown.com')
dim_customer_prep['phone'] = dim_customer_prep['phone'].fillna('N/A')

print(f"Prepared {len(dim_customer_prep):,} unique customers")

# Load to database
print(f"Loading to database...")
dim_customer_prep.to_sql('dim_customer', engine, if_exists='append', index=False, method='multi', chunksize=5000)
print(f"‚úÖ dim_customer loaded: {len(dim_customer_prep):,} records")

# Verify
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM dim_customer WHERE user_id != 'UNKNOWN';"))
    count = result.fetchone()[0]
    print(f"   Verified in database: {count:,} records (excluding Unknown)")
    
    # Show sample
    result = conn.execute(text("SELECT customer_key, user_id, customer_age, gender FROM dim_customer LIMIT 5;"))
    print("\n   Sample records:")
    for row in result:
        print(f"     {row}")


üë§ Loading dim_customer dimension...

Prepared 109,679 unique customers
Loading to database...
‚úÖ dim_customer loaded: 109,679 records
   Verified in database: 109,678 records (excluding Unknown)

   Sample records:
     (1, 'UNKNOWN', None, 'Unknown')
     (2, 'f1ace526-a249-4cec-b47d-d4b00c035d9b', None, 'Unknown')
     (3, '3e3c5cf1-db7d-4718-9c32-597adc65ce36', 26, 'female')
     (4, 'd3c42b55-c52e-4695-b12e-11ab0112a1fe', None, 'Unknown')
     (5, 'eed5119f-c0d3-4316-809b-df59c9d69b06', None, 'Unknown')


## Dim Product

In [23]:
# ==============================================================================
# 3. LOAD dim_product
# ==============================================================================

print("\nüõí Loading dim_product dimension...\n")

# Prepare product dimension from productCatalog
dim_product_prep = productCatalog[['IdMaterial', 'MATERIAL', 'EAN_UPC', 'BRAND', 
                                    'CATEGORY_PROJECT', 'SEGMENT_DESC']].copy()

# Rename columns to match dimension schema
dim_product_prep.rename(columns={
    'IdMaterial': 'product_id',
    'MATERIAL': 'product_name',
    'EAN_UPC': 'ean_upc',
    'BRAND': 'brand',
    'CATEGORY_PROJECT': 'category',
    'SEGMENT_DESC': 'segment'
}, inplace=True)

# Add is_active column (all products in catalog are active)
dim_product_prep['is_active'] = True

# Handle NaN values
dim_product_prep['product_id'] = dim_product_prep['product_id'].fillna('UNKNOWN')
dim_product_prep['product_name'] = dim_product_prep['product_name'].fillna('Unknown Product')
dim_product_prep['ean_upc'] = dim_product_prep['ean_upc'].fillna('N/A')
dim_product_prep['brand'] = dim_product_prep['brand'].fillna('Unknown')
dim_product_prep['category'] = dim_product_prep['category'].fillna('Unknown')
dim_product_prep['segment'] = dim_product_prep['segment'].fillna('Unknown')

# Remove duplicates
dim_product_prep = dim_product_prep.drop_duplicates(subset=['product_id'], keep='first')

print(f"Prepared {len(dim_product_prep):,} unique products")

# Load to database
print(f"Loading to database...")
dim_product_prep.to_sql('dim_product', engine, if_exists='append', index=False, method='multi', chunksize=1000)
print(f"‚úÖ dim_product loaded: {len(dim_product_prep):,} records")

# Verify
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM dim_product WHERE product_id != 'UNKNOWN';"))
    count = result.fetchone()[0]
    print(f"   Verified in database: {count:,} records (excluding Unknown)")
    
    # Show sample
    result = conn.execute(text("SELECT product_key, product_id, product_name, brand, category FROM dim_product LIMIT 5;"))
    print("\n   Sample records:")
    for row in result:
        print(f"     {row[0]} | {row[1]} | {row[2][:30]}... | {row[3]} | {row[4]}")


üõí Loading dim_product dimension...

Prepared 7,158 unique products
Loading to database...
‚úÖ dim_product loaded: 7,158 records
   Verified in database: 7,158 records (excluding Unknown)

   Sample records:
     1 | UNKNOWN | Unknown Product... | Unknown | Unknown
     2 | 1 | WC07001Q... | WHR | T-06 FREEZER
     3 | 2 | WA1045Q... | WHR | T-05 AT
     4 | 3 | WA2043Q... | WHR | T-05 AT
     5 | 4 | WC10001Q... | WHR | T-06 FREEZER


## Dim Geography

In [24]:
# ==============================================================================
# 4. LOAD dim_geography
# ==============================================================================

print("\nüìç Loading dim_geography dimension...\n")

# Prepare geography dimension from customerAddress
dim_geography_prep = customerAddress[['id', 'userId', 'country', 'state', 'city', 
                                            'neighborhood', 'postalCode', 'addressName', 
                                            'geoCoordinate']].copy()

# Rename columns
dim_geography_prep.rename(columns={
    'id': 'address_id',
    'userId': 'user_id',
    'postalCode': 'postal_code',
    'addressName': 'street',
    'geoCoordinate': 'geoCoordinates'  # Rename for consistency with processing
}, inplace=True)

# Parse latitude/longitude from geoCoordinates (assuming format like [-23.5,-46.6])
def parse_coordinates(coord_str):
    try:
        if pd.isna(coord_str):
            return None, None
        # Remove brackets and split
        coords = str(coord_str).strip('[]').split(',')
        if len(coords) == 2:
            lat = float(coords[0])
            lon = float(coords[1])
            
            # Validate coordinate ranges
            # Latitude: -90 to 90
            # Longitude: -180 to 180
            if -90 <= lat <= 90 and -180 <= lon <= 180:
                return lat, lon
            else:
                # Invalid range, return None
                return None, None
    except:
        pass
    return None, None

dim_geography_prep[['latitude', 'longitude']] = dim_geography_prep['geoCoordinates'].apply(
    lambda x: pd.Series(parse_coordinates(x))
)

# Drop original geoCoordinates column
dim_geography_prep = dim_geography_prep.drop(columns=['geoCoordinates'])

# Add additional columns
dim_geography_prep['address_type'] = 'Residential'  # Default assumption
dim_geography_prep['is_default'] = False  # Could be enhanced with business logic

# Handle NaN values
dim_geography_prep['address_id'] = dim_geography_prep['address_id'].fillna('UNKNOWN')
dim_geography_prep['user_id'] = dim_geography_prep['user_id'].fillna('UNKNOWN')
dim_geography_prep['country'] = dim_geography_prep['country'].fillna('Unknown')
dim_geography_prep['state'] = dim_geography_prep['state'].fillna('Unknown')
dim_geography_prep['city'] = dim_geography_prep['city'].fillna('Unknown')
dim_geography_prep['neighborhood'] = dim_geography_prep['neighborhood'].fillna('Unknown')
dim_geography_prep['postal_code'] = dim_geography_prep['postal_code'].fillna('N/A')
dim_geography_prep['street'] = dim_geography_prep['street'].fillna('Unknown')

# Remove duplicates on address_id
dim_geography_prep = dim_geography_prep.drop_duplicates(subset=['address_id'], keep='first')

# Log coordinate statistics
valid_coords = dim_geography_prep[['latitude', 'longitude']].notna().all(axis=1).sum()
print(f"Prepared {len(dim_geography_prep):,} unique addresses")
print(f"  Valid coordinates: {valid_coords:,} ({valid_coords/len(dim_geography_prep)*100:.1f}%)")

# Load to database
print(f"\nLoading to database...")
dim_geography_prep.to_sql('dim_geography', engine, if_exists='append', index=False, method='multi', chunksize=5000)
print(f"‚úÖ dim_geography loaded: {len(dim_geography_prep):,} records")

# Verify
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM dim_geography WHERE address_id != 'UNKNOWN';"))
    count = result.fetchone()[0]
    print(f"   Verified in database: {count:,} records (excluding Unknown)")
    
    # Show sample
    result = conn.execute(text("SELECT geography_key, user_id, city, state, country FROM dim_geography LIMIT 5;"))
    print("\n   Sample records:")
    for row in result:
        print(f"     {row}")


üìç Loading dim_geography dimension...

Prepared 56,233 unique addresses
  Valid coordinates: 24,277 (43.2%)

Loading to database...
‚úÖ dim_geography loaded: 56,233 records
   Verified in database: 56,233 records (excluding Unknown)

   Sample records:
     (1, 'UNKNOWN', 'Unknown', 'Unknown', 'Unknown')
     (2, '70f350f5-f62e-11ec-835d-0a8fb171123f', 'GUADALUPE', 'NUEVO LE√ìN', 'MEX')
     (3, 'c21ef477-e9c4-11ec-835d-02978ed58bf1', 'SAN NICOL√ÅS DE LOS GARZA', 'NUEVO LE√ìN', 'MEX')
     (4, '66021709-f670-11ec-835d-16b245a39a51', 'MONTERREY', 'NUEVO LE√ìN', 'MEX')
     (5, '79e873e1-e2c7-11ec-835d-1205375cb899', 'XOCHIMILCO', 'CIUDAD DE M√âXICO', 'MEX')


## Dim Order

In [25]:
# ==============================================================================
# 5. LOAD dim_order
# ==============================================================================

print("\nüì¶ Loading dim_order dimension...\n")

# Select columns from ordersList
orders_from_list = ordersList[[
    'orderId', 'creationDate', 'authorizedDate', 'status', 'paymentNames',
    'salesChannel', 'ShippingEstimatedDate', 'ShippingEstimatedDateMin', 
    'ShippingEstimatedDateMax', 'days_to_shipping'
]].copy()

# Select columns from generalOrder
orders_from_general = generalOrder[[
    'orderId', 'invoicedDate', 'order_year', 'order_month', 
    'order_quarter', 'order_dayofweek'
]].copy()

# Merge both sources
orders_merged = orders_from_list.merge(
    orders_from_general,
    on='orderId',
    how='left'
)

# Prepare order dimension
dim_order_prep = orders_merged.copy()

# Rename columns to match schema
dim_order_prep.rename(columns={
    'orderId': 'order_id',
    'creationDate': 'creation_date',
    'authorizedDate': 'authorized_date',
    'invoicedDate': 'invoiced_date',
    'status': 'order_status',
    'paymentNames': 'payment_method',
    'ShippingEstimatedDate': 'shipping_estimated_date',
    'ShippingEstimatedDateMin': 'shipping_estimated_min',
    'ShippingEstimatedDateMax': 'shipping_estimated_max',
    'order_dayofweek': 'order_day_of_week',
    'salesChannel': 'channel'
}, inplace=True)

# Add seller_id column (not available in source data)
dim_order_prep['seller_id'] = 'Unknown'

# Convert dates to proper datetime format
dim_order_prep['creation_date'] = pd.to_datetime(dim_order_prep['creation_date'], errors='coerce')
dim_order_prep['authorized_date'] = pd.to_datetime(dim_order_prep['authorized_date'], errors='coerce')
dim_order_prep['invoiced_date'] = pd.to_datetime(dim_order_prep['invoiced_date'], errors='coerce')
dim_order_prep['shipping_estimated_date'] = pd.to_datetime(dim_order_prep['shipping_estimated_date'], errors='coerce')
dim_order_prep['shipping_estimated_min'] = pd.to_datetime(dim_order_prep['shipping_estimated_min'], errors='coerce')
dim_order_prep['shipping_estimated_max'] = pd.to_datetime(dim_order_prep['shipping_estimated_max'], errors='coerce')

# Handle NaN/missing values
dim_order_prep['order_id'] = dim_order_prep['order_id'].fillna('UNKNOWN')
dim_order_prep['order_status'] = dim_order_prep['order_status'].fillna('Unknown')
dim_order_prep['payment_method'] = dim_order_prep['payment_method'].fillna('Unknown')
dim_order_prep['channel'] = dim_order_prep['channel'].fillna('Unknown')

# Remove duplicates on order_id (keep first occurrence)
dim_order_prep = dim_order_prep.drop_duplicates(subset=['order_id'], keep='first')

print(f"Prepared {len(dim_order_prep):,} unique orders")
print(f"  Source: ordersList ({len(orders_from_list):,} records) + generalOrder ({len(orders_from_general):,} records)")

# Load to database
print(f"\nLoading to database...")
dim_order_prep.to_sql('dim_order', engine, if_exists='append', index=False, method='multi', chunksize=5000)
print(f"‚úÖ dim_order loaded: {len(dim_order_prep):,} records")

# Verify
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM dim_order WHERE order_id != 'UNKNOWN';"))
    count = result.fetchone()[0]
    print(f"   Verified in database: {count:,} records (excluding Unknown)")
    
    # Show sample with stats
    result = conn.execute(text("""
        SELECT order_status, COUNT(*) as count 
        FROM dim_order 
        WHERE order_id != 'UNKNOWN'
        GROUP BY order_status 
        ORDER BY count DESC 
        LIMIT 5;
    """))
    print("\n   Top order statuses:")
    for row in result:
        print(f"     {row[0]}: {row[1]:,} orders")


üì¶ Loading dim_order dimension...

Prepared 61,475 unique orders
  Source: ordersList (67,831 records) + generalOrder (59,310 records)

Loading to database...
‚úÖ dim_order loaded: 61,475 records
   Verified in database: 61,475 records (excluding Unknown)

   Top order statuses:
     invoiced: 23,152 orders
     handling: 18,374 orders
     canceled: 9,093 orders
     ready-for-handling: 8,222 orders
     payment-pending: 1,064 orders


## Fact Sales

## Data Quality Investigation - Duplicates Check

In [None]:
print("üîç INVESTIGACI√ìN: ¬øPOR QU√â EL REVENUE ES TAN ALTO?\n")
print("=" * 70)

# 1. Verificar si generalOrder tiene duplicados despu√©s de la limpieza
print("\n1. Verificar duplicados en generalOrder:")
dup_count = generalOrder['orderId'].duplicated().sum()
print(f"   OrderIds duplicados: {dup_count:,}")
if dup_count == 0:
    print("   ‚úÖ No hay duplicados - la limpieza funcion√≥")
else:
    print(f"   ‚ùå A√∫n hay {dup_count:,} duplicados!")

# 2. Analizar distribuci√≥n de precios
print("\n2. Estad√≠sticas de PRECIOS en productOrderDetail:")
print("\n   SELLING PRICE:")
print(productOrderDetail['sellingPrice'].describe())

print("\n3. Top 10 productos m√°s caros:")
top10 = productOrderDetail.nlargest(10, 'sellingPrice')[['orderId', 'productId', 'sellingPrice', 'quantity']]
for idx, row in top10.iterrows():
    print(f"   OrderId: {row['orderId']}, ProductId: {row['productId']}, Price: ${row['sellingPrice']:,.2f}, Qty: {row['quantity']}")

# 4. Calcular revenue total directamente
print("\n4. Revenue calculado directamente desde productOrderDetail:")
direct_revenue = (productOrderDetail['sellingPrice'] * productOrderDetail['quantity']).sum()
print(f"   Total: ${direct_revenue:,.2f}")

# 5. Verificar si el problema es conversi√≥n de moneda
print("\n5. ¬øLos precios est√°n en CENTAVOS en lugar de PESOS?")
sample_prices = productOrderDetail['sellingPrice'].head(20)
print(f"   Muestra de 20 precios: {sample_prices.tolist()}")
print(f"   Si estos precios est√°n en centavos, dividir entre 100")

# 6. Comparar join counts
print("\n6. Verificar counts en el join:")
print(f"   productOrderDetail: {len(productOrderDetail):,} rows")
print(f"   generalOrder: {len(generalOrder):,} rows")
print(f"   generalOrder unique orderIds: {generalOrder['orderId'].nunique():,}")

# Simular merge
test_merge = productOrderDetail[['orderId']].merge(
    generalOrder[['orderId', 'ClientId']], 
    on='orderId', 
    how='left'
)
print(f"   Despu√©s del merge: {len(test_merge):,} rows")
print(f"   Aumento: +{len(test_merge) - len(productOrderDetail):,}")

print("\n" + "=" * 70)

In [26]:
# ==============================================================================
# 6. LOAD fact_sales - WITH LOOKUPS TO ALL DIMENSIONS
# ==============================================================================

print("\nüí∞ Loading fact_sales (with dimension lookups)...\n")

# Step 1: Create lookup dictionaries from dimensions
print("Step 1: Creating lookup dictionaries from dimensions...")

# Customer lookup: user_id -> customer_key
with engine.connect() as conn:
    result = conn.execute(text("SELECT user_id, customer_key FROM dim_customer;"))
    customer_lookup = {row[0]: row[1] for row in result}
    unknown_customer_key = customer_lookup.get('UNKNOWN', 1)
    print(f"  ‚úì Customer lookup: {len(customer_lookup):,} mappings")

# Product lookup: product_id -> product_key
with engine.connect() as conn:
    result = conn.execute(text("SELECT product_id, product_key FROM dim_product;"))
    product_lookup = {row[0]: row[1] for row in result}
    unknown_product_key = product_lookup.get('UNKNOWN', 1)
    print(f"  ‚úì Product lookup: {len(product_lookup):,} mappings")

# Date lookup: full_date -> date_key (create a helper function)
def get_date_key(date_value):
    """Convert date to YYYYMMDD integer format"""
    try:
        if pd.isna(date_value):
            return 19000101  # Unknown date key
        dt = pd.to_datetime(date_value)
        return int(dt.strftime('%Y%m%d'))
    except:
        return 19000101  # Unknown date key

# Geography lookup: user_id -> geography_key (get first address per user)
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT DISTINCT ON (user_id) user_id, geography_key 
        FROM dim_geography 
        WHERE user_id != 'UNKNOWN'
        ORDER BY user_id, geography_key;
    """))
    geography_lookup = {row[0]: row[1] for row in result}
    unknown_geography_key = geography_lookup.get('UNKNOWN', 1)
    print(f"  ‚úì Geography lookup: {len(geography_lookup):,} mappings")

# Order lookup: order_id -> order_key
with engine.connect() as conn:
    result = conn.execute(text("SELECT order_id, order_key FROM dim_order;"))
    order_lookup = {row[0]: row[1] for row in result}
    unknown_order_key = order_lookup.get('UNKNOWN', 1)
    print(f"  ‚úì Order lookup: {len(order_lookup):,} mappings")

print(f"\n  ‚úÖ All lookup dictionaries created")


üí∞ Loading fact_sales (with dimension lookups)...

Step 1: Creating lookup dictionaries from dimensions...
  ‚úì Customer lookup: 109,680 mappings
  ‚úì Product lookup: 7,159 mappings
  ‚úì Geography lookup: 50,153 mappings
  ‚úì Order lookup: 61,476 mappings

  ‚úÖ All lookup dictionaries created


In [27]:
# Step 2: Prepare fact_sales data
print("\nStep 2: Preparing fact_sales data from productOrderDetail...\n")

# Select relevant columns from productOrderDetail
fact_sales_prep = productOrderDetail[[
    'orderId', 'productId', 'quantity', 'price', 'listPrice', 
    'sellingPrice', 'shippingPrice', 'isGift'
]].copy()

print(f"  Initial records from productOrderDetail: {len(fact_sales_prep):,}")

# Join with generalOrder to get ClientId (user_id) and creationDate
fact_sales_prep = fact_sales_prep.merge(
    generalOrder[['orderId', 'ClientId', 'creationDate']],
    on='orderId',
    how='left'
)

print(f"  After joining with generalOrder: {len(fact_sales_prep):,}")

# Rename ClientId to user_id for clarity
fact_sales_prep.rename(columns={'ClientId': 'user_id'}, inplace=True)

# Convert creationDate to datetime
fact_sales_prep['creationDate'] = pd.to_datetime(fact_sales_prep['creationDate'], errors='coerce')

# Convert product_id to string
fact_sales_prep['productId'] = fact_sales_prep['productId'].astype(str)

print(f"  ‚úì Data prepared for dimension lookups")


Step 2: Preparing fact_sales data from productOrderDetail...

  Initial records from productOrderDetail: 87,609
  After joining with generalOrder: 87,609
  ‚úì Data prepared for dimension lookups


In [28]:
# Step 3: Apply dimension lookups to get foreign keys
print("\nStep 3: Applying dimension lookups to create foreign keys...\n")

# Lookup customer_key
fact_sales_prep['customer_key'] = fact_sales_prep['user_id'].map(customer_lookup).fillna(unknown_customer_key).astype(int)

# Lookup product_key
fact_sales_prep['product_key'] = fact_sales_prep['productId'].map(product_lookup).fillna(unknown_product_key).astype(int)

# Lookup date_key
fact_sales_prep['date_key'] = fact_sales_prep['creationDate'].apply(get_date_key)

# Lookup geography_key (using user_id)
fact_sales_prep['geography_key'] = fact_sales_prep['user_id'].map(geography_lookup).fillna(unknown_geography_key).astype(int)

# Lookup order_key
fact_sales_prep['order_key'] = fact_sales_prep['orderId'].map(order_lookup).fillna(unknown_order_key).astype(int)

# Calculate discount_amount (listPrice - sellingPrice)
fact_sales_prep['discount_amount'] = fact_sales_prep['listPrice'] - fact_sales_prep['sellingPrice']
fact_sales_prep['discount_amount'] = fact_sales_prep['discount_amount'].clip(lower=0)  # No negative discounts

# Calculate total_amount (sellingPrice * quantity + shippingPrice)
fact_sales_prep['total_amount'] = (fact_sales_prep['sellingPrice'] * fact_sales_prep['quantity']) + fact_sales_prep['shippingPrice'].fillna(0)

# Prepare final fact table with correct column names
fact_sales_final = fact_sales_prep[[
    'orderId', 'customer_key', 'product_key', 'date_key', 'geography_key', 'order_key',
    'quantity', 'price', 'listPrice', 'sellingPrice', 'discount_amount', 
    'shippingPrice', 'total_amount', 'isGift'
]].copy()

# Rename columns to match schema
fact_sales_final.rename(columns={
    'orderId': 'order_id',
    'price': 'unit_price',
    'listPrice': 'list_price',
    'sellingPrice': 'selling_price',
    'shippingPrice': 'shipping_price',
    'isGift': 'is_gift'
}, inplace=True)

# Handle missing values
fact_sales_final['unit_price'] = fact_sales_final['unit_price'].fillna(0)
fact_sales_final['list_price'] = fact_sales_final['list_price'].fillna(0)
fact_sales_final['selling_price'] = fact_sales_final['selling_price'].fillna(0)
fact_sales_final['shipping_price'] = fact_sales_final['shipping_price'].fillna(0)
fact_sales_final['discount_amount'] = fact_sales_final['discount_amount'].fillna(0)
fact_sales_final['total_amount'] = fact_sales_final['total_amount'].fillna(0)
fact_sales_final['quantity'] = fact_sales_final['quantity'].fillna(1).astype(int)
fact_sales_final['is_gift'] = fact_sales_final['is_gift'].fillna(False)

# Report statistics
print(f"  ‚úì Final fact_sales records: {len(fact_sales_final):,}")
print(f"  ‚úì Unique orders: {fact_sales_final['order_id'].nunique():,}")
print(f"  ‚úì Unique customers: {fact_sales_final['customer_key'].nunique():,}")
print(f"  ‚úì Unique products: {fact_sales_final['product_key'].nunique():,}")
print(f"  ‚úì Date range: {fact_sales_final['date_key'].min()} to {fact_sales_final['date_key'].max()}")
print(f"  ‚úì Total revenue: ${fact_sales_final['total_amount'].sum():,.2f}")


Step 3: Applying dimension lookups to create foreign keys...

  ‚úì Final fact_sales records: 87,609
  ‚úì Unique orders: 61,578
  ‚úì Unique customers: 1,979
  ‚úì Unique products: 1,225
  ‚úì Date range: 19000101 to 20221103
  ‚úì Total revenue: $4,190,403,906,678.00


In [29]:
# Step 4: Load to database
print("\nStep 4: Loading fact_sales to database...")
print(f"  Loading {len(fact_sales_final):,} records in batches...\n")

# Load in chunks to avoid memory issues
chunk_size = 10000
total_chunks = (len(fact_sales_final) + chunk_size - 1) // chunk_size

for i in range(total_chunks):
    start_idx = i * chunk_size
    end_idx = min((i + 1) * chunk_size, len(fact_sales_final))
    chunk = fact_sales_final.iloc[start_idx:end_idx]
    
    chunk.to_sql('fact_sales', engine, if_exists='append', index=False, method='multi')
    
    progress = (i + 1) / total_chunks * 100
    print(f"  Progress: {progress:5.1f}% ({end_idx:,} / {len(fact_sales_final):,} records)", end='\r')

print(f"\n\n‚úÖ fact_sales loaded: {len(fact_sales_final):,} records")

# Verify
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM fact_sales;"))
    count = result.fetchone()[0]
    print(f"   Verified in database: {count:,} records")
    
    # Calculate total revenue
    result = conn.execute(text("SELECT SUM(total_amount) FROM fact_sales;"))
    total_revenue = result.fetchone()[0]
    print(f"   Total revenue: ${total_revenue:,.2f}" if total_revenue else "   Total revenue: $0.00")
    
    # Show sample
    result = conn.execute(text("""
        SELECT sale_id, order_id, customer_key, product_key, quantity, total_amount 
        FROM fact_sales 
        LIMIT 5;
    """))
    print("\n   Sample records:")
    for row in result:
        print(f"     Sale {row[0]} | Order: {row[1]} | Customer: {row[2]} | Product: {row[3]} | Qty: {row[4]} | Total: ${row[5]:.2f}")

print("\n" + "=" * 70)
print("üéâ ALL DATA LOADED SUCCESSFULLY INTO STAR SCHEMA!")
print("=" * 70)


Step 4: Loading fact_sales to database...
  Loading 87,609 records in batches...

  Progress: 100.0% (87,609 / 87,609 records)

‚úÖ fact_sales loaded: 87,609 records
   Verified in database: 87,609 records
   Total revenue: $4,190,403,906,678.00

   Sample records:
     Sale 1 | Order: 1100000450614-01 | Customer: 1 | Product: 707 | Qty: 1 | Total: $1224900.00
     Sale 2 | Order: 1100000450614-01 | Customer: 1 | Product: 707 | Qty: 1 | Total: $1224900.00
     Sale 3 | Order: 1100031691608-01 | Customer: 1 | Product: 1090 | Qty: 1 | Total: $278000.00
     Sale 4 | Order: 1100031691608-01 | Customer: 1 | Product: 1090 | Qty: 1 | Total: $278000.00
     Sale 5 | Order: 1100183075120-01 | Customer: 1 | Product: 209 | Qty: 1 | Total: $2414900.00

üéâ ALL DATA LOADED SUCCESSFULLY INTO STAR SCHEMA!


In [30]:
# Verificar duplicados en generalOrder (datos "limpios")
print("üîç VERIFICANDO DUPLICADOS EN generalOrder\n")
print("=" * 70)

# Check 1: Duplicados totales (todas las columnas)
total_dups = generalOrder.duplicated().sum()
print(f"\n1. Duplicados exactos (todas las columnas): {total_dups:,}")

# Check 2: Duplicados por orderId (la clave de negocio)
duplicate_orderids = generalOrder['orderId'].duplicated().sum()
print(f"\n2. OrderIds duplicados: {duplicate_orderids:,}")

if duplicate_orderids > 0:
    print(f"\n   ‚ö†Ô∏è PROBLEMA ENCONTRADO: generalOrder tiene {duplicate_orderids:,} orderIds duplicados!")
    
    # Mostrar ejemplos
    dup_ids = generalOrder[generalOrder['orderId'].duplicated(keep=False)]['orderId'].unique()[:5]
    print(f"\n   Ejemplos de orderIds duplicados:")
    for oid in dup_ids:
        count = (generalOrder['orderId'] == oid).sum()
        print(f"     - '{oid}': {count} veces")
    
    # Estad√≠sticas
    dup_df = generalOrder[generalOrder['orderId'].duplicated(keep=False)]
    print(f"\n   Total de registros involucrados: {len(dup_df):,}")
    print(f"   OrderIds √∫nicos duplicados: {dup_df['orderId'].nunique():,}")
else:
    print("\n   ‚úÖ No hay duplicados por orderId")

print("\n" + "=" * 70)

üîç VERIFICANDO DUPLICADOS EN generalOrder


1. Duplicados exactos (todas las columnas): 0

2. OrderIds duplicados: 0

   ‚úÖ No hay duplicados por orderId

