In [None]:
import pandas as pd
import os
import numpy as np
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

# 1. Configuraci√≥n y Conexi√≥n
# ==========================================
print("üîå Estableciendo conexi√≥n con la base de datos...")
load_dotenv() # Carga las variables del archivo .env

# Verificamos credenciales
required_vars = ['DB_USER', 'DB_PASSWORD', 'DB_HOST', 'DB_PORT', 'DB_NAME']
if not all(os.getenv(var) for var in required_vars):
    raise ValueError("‚ùå Faltan variables en el archivo .env")

# Crear String de Conexi√≥n
DB_URL = f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
engine = create_engine(DB_URL)

# 2. Carga del CSV Limpio
# ==========================================
csv_file = 'ventas_limpio_auto.csv'
print(f"üìÇ Leyendo archivo: {csv_file}...")

if os.path.exists(csv_file):
    df = pd.read_csv(csv_file)
    
    # üîß REPARACI√ìN: Manejo robusto de fechas
    print("   üîß Procesando columna de fecha...")
    
    # Primero, reemplazar "Nan" strings por valores NaN reales
    df['fecha'] = df['fecha'].replace('Nan', np.nan)
    
    # Luego convertir a datetime manejando errores
    df['fecha'] = pd.to_datetime(df['fecha'], errors='coerce', format='%Y-%m-%d')
    
    # Verificar si hay fechas inv√°lidas
    fechas_invalidas = df['fecha'].isna().sum()
    if fechas_invalidas > 0:
        print(f"   ‚ö†Ô∏è  Advertencia: {fechas_invalidas} registros con fechas inv√°lidas ser√°n eliminados")
        # Eliminar registros con fechas inv√°lidas
        df = df.dropna(subset=['fecha'])
    
    print(f"‚úÖ Datos cargados: {df.shape[0]} registros v√°lidos")
else:
    raise FileNotFoundError("‚ùå No se encuentra 'ventas_limpio_auto.csv'. Ejecuta la limpieza primero.")

# 3. Normalizaci√≥n (Creaci√≥n de Dimensiones)
# ==========================================
print("\nüîÑ Iniciando Normalizaci√≥n (Modelo Estrella)...")

# --- A. Dimensi√≥n Producto ---
# Extraemos √∫nicos de producto y tipo
dim_producto = df[['producto', 'tipo_producto']].drop_duplicates().reset_index(drop=True)
dim_producto['id_producto'] = dim_producto.index + 1 # Crear Primary Key
# Mapeamos el ID al dataframe principal
df = df.merge(dim_producto, on=['producto', 'tipo_producto'], how='left')

# --- B. Dimensi√≥n Geograf√≠a ---
# Extraemos √∫nicos de ciudad y pais
dim_geografia = df[['ciudad', 'pais']].drop_duplicates().reset_index(drop=True)
dim_geografia['id_geografia'] = dim_geografia.index + 1
df = df.merge(dim_geografia, on=['ciudad', 'pais'], how='left')

# --- C. Dimensi√≥n Canal/Cliente ---
# Extraemos √∫nicos de tipo de venta y cliente
dim_canal = df[['tipo_venta', 'tipo_cliente']].drop_duplicates().reset_index(drop=True)
dim_canal['id_canal'] = dim_canal.index + 1
df = df.merge(dim_canal, on=['tipo_venta', 'tipo_cliente'], how='left')

# --- D. Tabla de Hechos (Fact Table) ---
# Seleccionamos solo las m√©tricas y los IDs (Foreign Keys)
fact_ventas = df[[
    'fecha', 
    'id_producto', 
    'id_geografia', 
    'id_canal', 
    'cantidad', 
    'precio_unitario', 
    'descuento', 
    'costo_envio', 
    'total_ventas'
]].copy()

print("‚úÖ Datos normalizados en memoria.")

# 4. Carga a PostgreSQL (Load)
# ==========================================
print("\nüöÄ Subiendo datos a PostgreSQL (esto puede tardar unos minutos)...")

try:
    with engine.connect() as conn:
        # Opcional: Limpiar tablas anteriores si existen (Orden inverso a la creaci√≥n por las FK)
        print("   üßπ Limpiando tablas existentes...")
        conn.execute(text("DROP TABLE IF EXISTS fact_ventas CASCADE;"))
        conn.execute(text("DROP TABLE IF EXISTS dim_producto CASCADE;"))
        conn.execute(text("DROP TABLE IF EXISTS dim_geografia CASCADE;"))
        conn.execute(text("DROP TABLE IF EXISTS dim_canal CASCADE;"))
        conn.commit()

    # Subir Dimensiones
    print("   ‚¨ÜÔ∏è Subiendo Dimensi√≥n: Productos...")
    dim_producto.to_sql('dim_producto', engine, index=False, if_exists='replace')
    # A√±adir PK a la tabla creada
    with engine.connect() as conn:
        conn.execute(text("ALTER TABLE dim_producto ADD PRIMARY KEY (id_producto);"))
        conn.commit()

    print("   ‚¨ÜÔ∏è Subiendo Dimensi√≥n: Geograf√≠a...")
    dim_geografia.to_sql('dim_geografia', engine, index=False, if_exists='replace')
    with engine.connect() as conn:
        conn.execute(text("ALTER TABLE dim_geografia ADD PRIMARY KEY (id_geografia);"))
        conn.commit()

    print("   ‚¨ÜÔ∏è Subiendo Dimensi√≥n: Canal...")
    dim_canal.to_sql('dim_canal', engine, index=False, if_exists='replace')
    with engine.connect() as conn:
        conn.execute(text("ALTER TABLE dim_canal ADD PRIMARY KEY (id_canal);"))
        conn.commit()

    # Subir Hechos (Usamos chunksize para no saturar la memoria en 1.25M de filas)
    print("   ‚¨ÜÔ∏è Subiendo Tabla de Hechos: Ventas (por lotes)...")
    fact_ventas.to_sql('fact_ventas', engine, index=False, if_exists='replace', chunksize=10000)
    
    # A√±adir Foreign Keys (Integridad Referencial)
    print("   üîó Estableciendo relaciones (Foreign Keys)...")
    with engine.connect() as conn:
        conn.execute(text("""
            ALTER TABLE fact_ventas 
            ADD CONSTRAINT fk_producto FOREIGN KEY (id_producto) REFERENCES dim_producto(id_producto),
            ADD CONSTRAINT fk_geografia FOREIGN KEY (id_geografia) REFERENCES dim_geografia(id_geografia),
            ADD CONSTRAINT fk_canal FOREIGN KEY (id_canal) REFERENCES dim_canal(id_canal);
        """))
        conn.commit()

    print("\nüéâ ¬°PROCESO EXITOSO!")
    print("   Las tablas han sido creadas y pobladas en PostgreSQL.")
    print(f"   - dim_producto: {dim_producto.shape[0]} productos")
    print(f"   - dim_geografia: {dim_geografia.shape[0]} ubicaciones")
    print(f"   - dim_canal: {dim_canal.shape[0]} canales")
    print(f"   - fact_ventas: {fact_ventas.shape[0]} transacciones")

except Exception as e:
    print(f"\n‚ùå Ocurri√≥ un error durante la carga: {e}")
    import traceback
    traceback.print_exc()