In [1]:
"""
GENERADOR DE MODELO ESTRELLA - ANEMIA INFANTIL ENDES
Convierte CSV consolidado ‚Üí SQLite con esquema estrella
Autor: [Tu nombre]
Fecha: 2025
"""

import pandas as pd
import numpy as np
import sqlite3
from pathlib import Path
from datetime import datetime
import json
import warnings
warnings.filterwarnings('ignore')

# ============================================================
# CONFIGURACI√ìN
# ============================================================
class Config:
    # Rutas
    CSV_INPUT = r"D:\Bases_train_test\endes_2015_2024_consolidado.csv"
    DB_OUTPUT = r"D:\Data_Warehouse\anemia_dwh.db"
    METADATA_DIR = Path(r"D:\Data_Warehouse\metadata")
    BACKUP_DIR = Path(r"D:\Data_Warehouse\backup")
    
    # Crear directorios si no existen
    METADATA_DIR.mkdir(parents=True, exist_ok=True)
    BACKUP_DIR.mkdir(parents=True, exist_ok=True)
    
    # Configuraci√≥n de procesamiento
    CHUNK_SIZE = 5000  # Para inserci√≥n en SQLite
    BACKUP_PARQUET = True  # Crear backup en parquet

config = Config()

# ============================================================
# UTILIDADES
# ============================================================
class Logger:
    """Logger simple para ETL"""
    def __init__(self, log_file):
        self.log_file = log_file
        self.start_time = datetime.now()
        
    def log(self, mensaje):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        msg = f"[{timestamp}] {mensaje}"
        print(msg)
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(msg + '\n')
    
    def finalizar(self):
        duracion = (datetime.now() - self.start_time).total_seconds()
        self.log(f"\n{'='*60}")
        self.log(f"Proceso completado en {duracion:.2f} segundos")
        self.log(f"{'='*60}")

# Inicializar logger
log_file = config.METADATA_DIR / f"log_etl_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
logger = Logger(log_file)

# ============================================================
# FUNCIONES DE DIMENSIONES
# ============================================================

def crear_dim_tiempo(df):
    """
    Dimensi√≥n TIEMPO
    Granularidad: A√±o (simplificado para ENDES)
    """
    logger.log("Creando DIM_TIEMPO...")
    
    anios = sorted(df['ANIO'].unique())
    
    dim_tiempo = []
    for anio in anios:
        for mes in range(1, 13):
            dim_tiempo.append({
                'id_tiempo': int(f"{anio}{mes:02d}"),  # 202401
                'anio': int(anio),
                'mes': mes,
                'trimestre': (mes - 1) // 3 + 1,
                'semestre': 1 if mes <= 6 else 2,
                'nombre_mes': ['Enero','Febrero','Marzo','Abril','Mayo','Junio',
                              'Julio','Agosto','Septiembre','Octubre','Noviembre','Diciembre'][mes-1],
                'quinquenio': f"{(anio//5)*5}-{(anio//5)*5+4}",
                'periodo_covid': int(anio in [2020, 2021])
            })
    
    df_tiempo = pd.DataFrame(dim_tiempo)
    logger.log(f"  ‚úì {len(df_tiempo)} registros creados")
    return df_tiempo


def crear_dim_geografia(df):
    """
    Dimensi√≥n GEOGRAF√çA
    Combina: departamento + √°rea + altitud
    """
    logger.log("Creando DIM_GEOGRAFIA...")
    
    # Seleccionar columnas geogr√°ficas
    geo_cols = ['HV024', 'HV025', 'HV040']
    
    # Combinaciones √∫nicas
    df_geo = df[geo_cols].drop_duplicates().reset_index(drop=True)
    
    # Asignar ID
    df_geo['id_geografia'] = range(1, len(df_geo) + 1)
    
    # Mapeo regi√≥n natural (simplificado - ajustar seg√∫n necesidad)
    mapa_region = {
        'tumbes': 'Costa', 'piura': 'Costa', 'lambayeque': 'Costa', 
        'la libertad': 'Costa', 'ancash': 'Costa', 'lima': 'Costa',
        'ica': 'Costa', 'arequipa': 'Costa', 'moquegua': 'Costa', 'tacna': 'Costa',
        'cajamarca': 'Sierra', 'huanuco': 'Sierra', 'pasco': 'Sierra',
        'junin': 'Sierra', 'huancavelica': 'Sierra', 'ayacucho': 'Sierra',
        'apurimac': 'Sierra', 'cusco': 'Sierra', 'puno': 'Sierra',
        'loreto': 'Selva', 'amazonas': 'Selva', 'san martin': 'Selva',
        'ucayali': 'Selva', 'madre de dios': 'Selva'
    }
    
    df_geo['region_natural'] = df_geo['HV024'].str.lower().map(mapa_region)
    df_geo['region_natural'] = df_geo['region_natural'].fillna('Otro')
    
    # Categorizar altitud
    df_geo['rango_altitud'] = pd.cut(
        df_geo['HV040'],
        bins=[-1, 500, 1500, 2500, 5000],
        labels=['<500m', '500-1500m', '1500-2500m', '>2500m']
    ).astype(str)
    
    # Renombrar columnas
    df_geo.rename(columns={
        'HV024': 'departamento',
        'HV025': 'area_residencia',
        'HV040': 'altitud_msnm'
    }, inplace=True)
    
    # Ordenar columnas
    df_geo = df_geo[['id_geografia', 'departamento', 'region_natural', 
                     'area_residencia', 'altitud_msnm', 'rango_altitud']]
    
    logger.log(f"  ‚úì {len(df_geo)} registros creados")
    return df_geo


def crear_dim_hogar(df):
    """
    Dimensi√≥n HOGAR
    Granularidad: HHID √∫nico
    """
    logger.log("Creando DIM_HOGAR...")
    
    # Columnas de hogar
    hogar_cols = ['HHID', 'HV009', 'HV271', 'V190', 
                  'HV206', 'HV201', 'HV205', 'HV237']
    
    # Verificar columnas disponibles
    hogar_cols_disponibles = [col for col in hogar_cols if col in df.columns]
    
    # Extraer hogares √∫nicos
    df_hogar = df[hogar_cols_disponibles].drop_duplicates(subset='HHID').reset_index(drop=True)
    
    # Asignar ID
    df_hogar['id_hogar'] = range(1, len(df_hogar) + 1)
    
    # Crear categor√≠as si las columnas existen
    if 'HV009' in df_hogar.columns:
        df_hogar['categoria_tamano'] = pd.cut(
            df_hogar['HV009'],
            bins=[0, 3, 5, 100],
            labels=['Peque√±o', 'Mediano', 'Grande']
        ).astype(str)
    
    # Renombrar columnas
    rename_map = {
        'HV009': 'num_miembros',
        'HV271': 'indice_riqueza_num',
        'V190': 'quintil_riqueza',
        'HV206': 'tiene_electricidad',
        'HV201': 'fuente_agua',
        'HV205': 'tipo_saneamiento',
        'HV237': 'trata_agua'
    }
    
    df_hogar.rename(columns={k: v for k, v in rename_map.items() if k in df_hogar.columns}, 
                    inplace=True)
    
    # Mover id_hogar y HHID al inicio
    cols = ['id_hogar', 'HHID'] + [col for col in df_hogar.columns if col not in ['id_hogar', 'HHID']]
    df_hogar = df_hogar[cols]
    
    logger.log(f"  ‚úì {len(df_hogar)} registros creados")
    return df_hogar


def crear_dim_madre(df):
    """
    Dimensi√≥n MADRE
    Granularidad: CASEID √∫nico
    """
    logger.log("Creando DIM_MADRE...")
    
    # Columnas de madre
    madre_cols = ['CASEID', 'V012', 'V106', 'V133', 'V025']
    madre_cols_disponibles = [col for col in madre_cols if col in df.columns]
    
    # Extraer madres √∫nicas
    df_madre = df[madre_cols_disponibles].drop_duplicates(subset='CASEID').reset_index(drop=True)
    
    # Asignar ID
    df_madre['id_madre'] = range(1, len(df_madre) + 1)
    
    # Crear categor√≠as si existen las columnas
    if 'V012' in df_madre.columns:
        df_madre['rango_edad'] = pd.cut(
            df_madre['V012'],
            bins=[0, 20, 35, 100],
            labels=['<20', '20-34', '35+']
        ).astype(str)
        
        df_madre['es_madre_adolescente'] = (df_madre['V012'] < 20).astype(int)
    
    if 'V133' in df_madre.columns:
        df_madre['categoria_educacion'] = pd.cut(
            df_madre['V133'],
            bins=[-1, 6, 12, 100],
            labels=['Baja', 'Media', 'Alta']
        ).astype(str)
    
    # Renombrar
    rename_map = {
        'V012': 'edad_actual',
        'V106': 'nivel_educativo',
        'V133': 'anios_educacion',
        'V025': 'area_residencia_madre'
    }
    
    df_madre.rename(columns={k: v for k, v in rename_map.items() if k in df_madre.columns}, 
                    inplace=True)
    
    # Reordenar
    cols = ['id_madre', 'CASEID'] + [col for col in df_madre.columns if col not in ['id_madre', 'CASEID']]
    df_madre = df_madre[cols]
    
    logger.log(f"  ‚úì {len(df_madre)} registros creados")
    return df_madre


def crear_dim_nino(df):
    """
    Dimensi√≥n NI√ëO
    Granularidad: HHID + HC0 (cada ni√±o)
    """
    logger.log("Creando DIM_NINO...")
    
    # Cada registro es un ni√±o √∫nico
    nino_cols = ['HHID', 'HC0', 'HC1', 'HC27', 'BORD', 'HC70', 'HW71']
    nino_cols_disponibles = [col for col in nino_cols if col in df.columns]
    
    df_nino = df[nino_cols_disponibles].copy()
    
    # Crear ID √∫nico combinando HHID + HC0
    df_nino['id_nino'] = range(1, len(df_nino) + 1)
    
    # Crear categor√≠as
    if 'HC1' in df_nino.columns:
        df_nino['rango_edad'] = pd.cut(
            df_nino['HC1'],
            bins=[5, 12, 18, 24, 36],
            labels=['6-11m', '12-17m', '18-23m', '24-35m'],
            right=False
        ).astype(str)
    
    if 'HC70' in df_nino.columns:
        df_nino['tiene_desnutricion_cronica'] = (df_nino['HC70'] < -2).astype(int)
    
    if 'HW71' in df_nino.columns:
        df_nino['tiene_bajo_peso'] = (df_nino['HW71'] < -2).astype(int)
    
    if 'BORD' in df_nino.columns:
        df_nino['categoria_orden'] = pd.cut(
            df_nino['BORD'],
            bins=[0, 1, 3, 100],
            labels=['Primog√©nito', '2-3', '4+']
        ).astype(str)
    
    # Renombrar
    rename_map = {
        'HC1': 'edad_meses',
        'HC27': 'sexo',
        'BORD': 'orden_nacimiento'
    }
    
    df_nino.rename(columns={k: v for k, v in rename_map.items() if k in df_nino.columns}, 
                   inplace=True)
    
    # Reordenar
    cols = ['id_nino', 'HHID', 'HC0'] + [col for col in df_nino.columns 
                                          if col not in ['id_nino', 'HHID', 'HC0']]
    df_nino = df_nino[cols]
    
    logger.log(f"  ‚úì {len(df_nino)} registros creados")
    return df_nino


def crear_fact_anemia(df, dim_tiempo, dim_geo, dim_hogar, dim_madre, dim_nino):
    """
    Tabla de HECHOS - FACT_ANEMIA
    """
    logger.log("Creando FACT_ANEMIA...")
    
    fact = df.copy()
    
    # 1. Mapear FK - TIEMPO (asignar a junio de cada a√±o)
    fact['id_tiempo'] = fact['ANIO'].astype(int) * 100 + 6
    
    # 2. Mapear FK - GEOGRAFIA
    fact = fact.merge(
        dim_geo[['id_geografia', 'departamento', 'area_residencia', 'altitud_msnm']],
        left_on=['HV024', 'HV025', 'HV040'],
        right_on=['departamento', 'area_residencia', 'altitud_msnm'],
        how='left'
    ).drop(columns=['departamento', 'area_residencia', 'altitud_msnm'])
    
    # 3. Mapear FK - HOGAR
    fact = fact.merge(
        dim_hogar[['id_hogar', 'HHID']],
        on='HHID',
        how='left'
    )
    
    # 4. Mapear FK - MADRE
    fact = fact.merge(
        dim_madre[['id_madre', 'CASEID']],
        on='CASEID',
        how='left'
    )
    
    # 5. Mapear FK - NINO
    fact = fact.merge(
        dim_nino[['id_nino', 'HHID', 'HC0']],
        on=['HHID', 'HC0'],
        how='left',
        suffixes=('', '_nino')
    )
    
    # 6. Seleccionar columnas de FACT
    fact_cols = [
        # FK
        'id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre',
        
        # IDs originales (trazabilidad)
        'HHID', 'CASEID', 'HC0',
        
        # M√©tricas antropom√©tricas
        'HW2', 'HW3', 'HC70', 'HW70', 'HW71', 'HW72', 'HW73',
        
        # Target
        'ANEMIA', 'HC57',
        
        # Pesos
        'PESO',
        
        # Flags de calidad
        'HC55', 'HV015', 'HV103'
    ]
    
    # Filtrar solo columnas existentes
    fact_cols_disponibles = [col for col in fact_cols if col in fact.columns]
    fact = fact[fact_cols_disponibles]
    
    # 7. Renombrar columnas
    rename_map = {
        'HW2': 'peso_kg',
        'HW3': 'talla_cm',
        'HC70': 'z_talla_edad',
        'HW70': 'z_talla_edad_alt',
        'HW71': 'z_peso_edad',
        'HW72': 'z_peso_talla',
        'HW73': 'z_imc',
        'ANEMIA': 'tiene_anemia',
        'HC57': 'nivel_anemia',
        'PESO': 'peso_muestral',
        'HC55': 'medicion_valida',
        'HV015': 'cuestionario_ok',
        'HV103': 'durmio_anoche'
    }
    
    fact.rename(columns={k: v for k, v in rename_map.items() if k in fact.columns}, 
                inplace=True)
    
    logger.log(f"  ‚úì {len(fact)} registros creados")
    
    return fact

# ============================================================
# VALIDACIONES
# ============================================================

def validar_integridad_referencial(fact, dims):
    """Validar FK en tabla de hechos"""
    logger.log("\nValidando integridad referencial...")
    
    errores = []
    
    # Verificar cada FK
    checks = [
        ('id_tiempo', dims['dim_tiempo'], 'id_tiempo'),
        ('id_geografia', dims['dim_geografia'], 'id_geografia'),
        ('id_hogar', dims['dim_hogar'], 'id_hogar'),
        ('id_madre', dims['dim_madre'], 'id_madre'),
        ('id_nino', dims['dim_nino'], 'id_nino')
    ]
    
    for fk_col, dim_df, dim_pk in checks:
        if fk_col in fact.columns:
            invalidos = ~fact[fk_col].isin(dim_df[dim_pk])
            n_invalidos = invalidos.sum()
            
            if n_invalidos > 0:
                errores.append(f"  ‚ùå {fk_col}: {n_invalidos} FK inv√°lidos")
            else:
                logger.log(f"  ‚úì {fk_col}: OK")
    
    if errores:
        logger.log("\n‚ö†Ô∏è  ERRORES DE INTEGRIDAD ENCONTRADOS:")
        for error in errores:
            logger.log(error)
        raise ValueError("Integridad referencial violada")
    else:
        logger.log("  ‚úÖ Integridad referencial OK")


def validar_calidad_datos(fact, dims):
    """Validar calidad de datos"""
    logger.log("\nValidando calidad de datos...")
    
    # 1. Registros en fact
    logger.log(f"  Total registros FACT: {len(fact):,}")
    
    # 2. Duplicados
    duplicados = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
    if duplicados > 0:
        logger.log(f"  ‚ö†Ô∏è  Duplicados encontrados: {duplicados}")
    else:
        logger.log(f"  ‚úì Sin duplicados")
    
    # 3. Missings en FK
    for col in ['id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre']:
        if col in fact.columns:
            missing = fact[col].isna().sum()
            if missing > 0:
                logger.log(f"  ‚ö†Ô∏è  {col}: {missing} missings ({missing/len(fact)*100:.1f}%)")
    
    # 4. Estad√≠sticas de dimensiones
    logger.log(f"\nDimensiones:")
    logger.log(f"  DIM_TIEMPO:    {len(dims['dim_tiempo']):>6,} registros")
    logger.log(f"  DIM_GEOGRAFIA: {len(dims['dim_geografia']):>6,} registros")
    logger.log(f"  DIM_HOGAR:     {len(dims['dim_hogar']):>6,} registros")
    logger.log(f"  DIM_MADRE:     {len(dims['dim_madre']):>6,} registros")
    logger.log(f"  DIM_NINO:      {len(dims['dim_nino']):>6,} registros")
    
    logger.log("\n  ‚úÖ Validaci√≥n de calidad completada")


# ============================================================
# GUARDAR EN SQLITE
# ============================================================

def crear_esquema_sqlite(conn):
    """Crear estructura de tablas con DDL"""
    logger.log("\nCreando esquema SQLite...")
    
    cursor = conn.cursor()
    
    # DDL para cada tabla (simplificado - SQLite infiere tipos)
    cursor.execute("DROP TABLE IF EXISTS fact_anemia")
    cursor.execute("DROP TABLE IF EXISTS dim_tiempo")
    cursor.execute("DROP TABLE IF EXISTS dim_geografia")
    cursor.execute("DROP TABLE IF EXISTS dim_hogar")
    cursor.execute("DROP TABLE IF EXISTS dim_madre")
    cursor.execute("DROP TABLE IF EXISTS dim_nino")
    
    conn.commit()
    logger.log("  ‚úì Tablas anteriores eliminadas (si exist√≠an)")


def guardar_en_sqlite(fact, dims, db_path):
    """Guardar modelo estrella en SQLite"""
    logger.log(f"\nGuardando en SQLite: {db_path}")
    
    conn = sqlite3.connect(db_path)
    
    # Crear esquema
    crear_esquema_sqlite(conn)
    
    # Guardar dimensiones
    logger.log("  Guardando dimensiones...")
    dims['dim_tiempo'].to_sql('dim_tiempo', conn, if_exists='replace', index=False)
    dims['dim_geografia'].to_sql('dim_geografia', conn, if_exists='replace', index=False)
    dims['dim_hogar'].to_sql('dim_hogar', conn, if_exists='replace', index=False)
    dims['dim_madre'].to_sql('dim_madre', conn, if_exists='replace', index=False)
    dims['dim_nino'].to_sql('dim_nino', conn, if_exists='replace', index=False)
    
    # Guardar hechos (en chunks)
    logger.log("  Guardando tabla de hechos...")
    fact.to_sql('fact_anemia', conn, if_exists='replace', 
                index=False, chunksize=config.CHUNK_SIZE)
    
    # Crear √≠ndices
    logger.log("  Creando √≠ndices...")
    cursor = conn.cursor()
    
    indices = [
        "CREATE INDEX idx_fact_tiempo ON fact_anemia(id_tiempo)",
        "CREATE INDEX idx_fact_geografia ON fact_anemia(id_geografia)",
        "CREATE INDEX idx_fact_hogar ON fact_anemia(id_hogar)",
        "CREATE INDEX idx_fact_madre ON fact_anemia(id_madre)",
        "CREATE INDEX idx_fact_nino ON fact_anemia(id_nino)",
        "CREATE INDEX idx_fact_anemia ON fact_anemia(tiene_anemia)",
        "CREATE INDEX idx_dim_geo_dept ON dim_geografia(departamento)",
        "CREATE INDEX idx_dim_tiempo_anio ON dim_tiempo(anio)"
    ]
    
    for idx_sql in indices:
        try:
            cursor.execute(idx_sql)
        except Exception as e:
            logger.log(f"    ‚ö†Ô∏è  Error creando √≠ndice: {e}")
    
    conn.commit()
    conn.close()
    
    logger.log("  ‚úÖ Guardado en SQLite completado")


def guardar_backup_parquet(fact, dims):
    """Backup en Parquet"""
    if not config.BACKUP_PARQUET:
        return
    
    logger.log(f"\nCreando backup Parquet en {config.BACKUP_DIR}...")
    
    fact.to_parquet(config.BACKUP_DIR / "fact_anemia.parquet", index=False)
    dims['dim_tiempo'].to_parquet(config.BACKUP_DIR / "dim_tiempo.parquet", index=False)
    dims['dim_geografia'].to_parquet(config.BACKUP_DIR / "dim_geografia.parquet", index=False)
    dims['dim_hogar'].to_parquet(config.BACKUP_DIR / "dim_hogar.parquet", index=False)
    dims['dim_madre'].to_parquet(config.BACKUP_DIR / "dim_madre.parquet", index=False)
    dims['dim_nino'].to_parquet(config.BACKUP_DIR / "dim_nino.parquet", index=False)
    
    logger.log("  ‚úÖ Backup Parquet completado")


def generar_metadata(fact, dims):
    """Generar metadata del warehouse"""
    logger.log("\nGenerando metadata...")
    
    metadata = {
        'fecha_creacion': datetime.now().isoformat(),
        'registros': {
            'fact_anemia': len(fact),
            'dim_tiempo': len(dims['dim_tiempo']),
            'dim_geografia': len(dims['dim_geografia']),
            'dim_hogar': len(dims['dim_hogar']),
            'dim_madre': len(dims['dim_madre']),
            'dim_nino': len(dims['dim_nino'])
        },
        'columnas': {
            'fact_anemia': list(fact.columns),
            'dim_tiempo': list(dims['dim_tiempo'].columns),
            'dim_geografia': list(dims['dim_geografia'].columns),
            'dim_hogar': list(dims['dim_hogar'].columns),
            'dim_madre': list(dims['dim_madre'].columns),
            'dim_nino': list(dims['dim_nino'].columns)
        },
        'prevalencia_anemia': float(fact['tiene_anemia'].mean() * 100) if 'tiene_anemia' in fact.columns else None
    }
    
    # Guardar como JSON
    with open(config.METADATA_DIR / 'estadisticas.json', 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)
    
    logger.log("  ‚úì Metadata guardada")


# ============================================================
# PIPELINE PRINCIPAL
# ============================================================

def main():
    """Pipeline completo ETL"""
    
    logger.log("="*60)
    logger.log("INICIANDO GENERACI√ìN DE MODELO ESTRELLA")
    logger.log("="*60)
    
    try:
        # 1. EXTRACT
        logger.log(f"\n1. Cargando CSV: {config.CSV_INPUT}")
        df = pd.read_csv(config.CSV_INPUT, encoding='utf-8-sig')
        logger.log(f"   ‚úì {len(df):,} registros cargados")
        logger.log(f"   ‚úì {df.shape[1]} columnas")
        
        # 2. TRANSFORM - Crear dimensiones
        logger.log("\n2. Creando dimensiones...")
        dim_tiempo = crear_dim_tiempo(df)
        dim_geografia = crear_dim_geografia(df)
        dim_hogar = crear_dim_hogar(df)
        dim_madre = crear_dim_madre(df)
        dim_nino = crear_dim_nino(df)
        
        dims = {
            'dim_tiempo': dim_tiempo,
            'dim_geografia': dim_geografia,
            'dim_hogar': dim_hogar,
            'dim_madre': dim_madre,
            'dim_nino': dim_nino
        }
        
        # 3. TRANSFORM - Crear hechos
        logger.log("\n3. Creando tabla de hechos...")
        fact = crear_fact_anemia(df, dim_tiempo, dim_geografia, 
                                dim_hogar, dim_madre, dim_nino)
        
        # 4. VALIDACIONES
        logger.log("\n4. Validando modelo...")
        validar_integridad_referencial(fact, dims)
        validar_calidad_datos(fact, dims)
        
        # 5. LOAD
        logger.log("\n5. Cargando a Data Warehouse...")
        guardar_en_sqlite(fact, dims, config.DB_OUTPUT)
        guardar_backup_parquet(fact, dims)
        generar_metadata(fact, dims)
        
        # FIN
        logger.log("\n" + "="*60)
        logger.log("‚úÖ MODELO ESTRELLA GENERADO EXITOSAMENTE")
        logger.log("="*60)
        logger.log(f"\nüìÅ Archivos generados:")
        logger.log(f"   ‚Ä¢ SQLite:  {config.DB_OUTPUT}")
        logger.log(f"   ‚Ä¢ Metadata: {config.METADATA_DIR}")
        logger.log(f"   ‚Ä¢ Backup:   {config.BACKUP_DIR}")
        
        # Estad√≠sticas finales
        tamanio_db = Path(config.DB_OUTPUT).stat().st_size / (1024*1024)
        logger.log(f"\nüìä Estad√≠sticas:")
        logger.log(f"   ‚Ä¢ Tama√±o DB: {tamanio_db:.2f} MB")
        logger.log(f"   ‚Ä¢ Total registros FACT: {len(fact):,}")
        logger.log(f"   ‚Ä¢ Prevalencia anemia: {fact['tiene_anemia'].mean()*100:.2f}%")
        
    except Exception as e:
        logger.log(f"\n‚ùå ERROR: {str(e)}")
        import traceback
        logger.log(traceback.format_exc())
        raise
    
    finally:
        logger.finalizar()


if __name__ == "__main__":
    main()

[2025-11-04 00:02:52] INICIANDO GENERACI√ìN DE MODELO ESTRELLA
[2025-11-04 00:02:52] 
1. Cargando CSV: D:\Bases_train_test\endes_2015_2024_consolidado.csv
[2025-11-04 00:02:54]    ‚úì 131,470 registros cargados
[2025-11-04 00:02:54]    ‚úì 42 columnas
[2025-11-04 00:02:54] 
2. Creando dimensiones...
[2025-11-04 00:02:54] Creando DIM_TIEMPO...
[2025-11-04 00:02:54]   ‚úì 120 registros creados
[2025-11-04 00:02:54] Creando DIM_GEOGRAFIA...
[2025-11-04 00:02:54]   ‚úì 8090 registros creados
[2025-11-04 00:02:54] Creando DIM_HOGAR...
[2025-11-04 00:02:54]   ‚úì 94921 registros creados
[2025-11-04 00:02:54] Creando DIM_MADRE...
[2025-11-04 00:02:54]   ‚úì 99928 registros creados
[2025-11-04 00:02:54] Creando DIM_NINO...
[2025-11-04 00:02:54]   ‚úì 131470 registros creados
[2025-11-04 00:02:54] 
3. Creando tabla de hechos...
[2025-11-04 00:02:54] Creando FACT_ANEMIA...
[2025-11-04 00:02:55]   ‚úì 189846 registros creados
[2025-11-04 00:02:55] 
4. Validando modelo...
[2025-11-04 00:02:55] 
Va

In [3]:
"""
GENERADOR DE MODELO ESTRELLA - ANEMIA INFANTIL ENDES (VERSI√ìN CORREGIDA)
Convierte CSV consolidado ‚Üí SQLite con esquema estrella
CORRECCI√ìN: dim_tiempo ahora tiene 1 registro por a√±o (no 12 meses)
Autor: [Tu nombre]
Fecha: 2025
"""

import pandas as pd
import numpy as np
import sqlite3
from pathlib import Path
from datetime import datetime
import json
import warnings
warnings.filterwarnings('ignore')

# ============================================================
# CONFIGURACI√ìN
# ============================================================
class Config:
    # Rutas
    CSV_INPUT = r"D:\Bases_train_test\endes_2015_2024_consolidado.csv"
    DB_OUTPUT = r"D:\Data_Warehouse\anemia_dwh.db"
    METADATA_DIR = Path(r"D:\Data_Warehouse\metadata")
    BACKUP_DIR = Path(r"D:\Data_Warehouse\backup")
    
    # Crear directorios si no existen
    METADATA_DIR.mkdir(parents=True, exist_ok=True)
    BACKUP_DIR.mkdir(parents=True, exist_ok=True)
    
    # Configuraci√≥n de procesamiento
    CHUNK_SIZE = 5000  # Para inserci√≥n en SQLite
    BACKUP_PARQUET = True  # Crear backup en parquet

config = Config()

# ============================================================
# UTILIDADES
# ============================================================
class Logger:
    """Logger simple para ETL"""
    def __init__(self, log_file):
        self.log_file = log_file
        self.start_time = datetime.now()
        
    def log(self, mensaje):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        msg = f"[{timestamp}] {mensaje}"
        print(msg)
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(msg + '\n')
    
    def finalizar(self):
        duracion = (datetime.now() - self.start_time).total_seconds()
        self.log(f"\n{'='*60}")
        self.log(f"Proceso completado en {duracion:.2f} segundos")
        self.log(f"{'='*60}")

# Inicializar logger
log_file = config.METADATA_DIR / f"log_etl_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
logger = Logger(log_file)

# ============================================================
# FUNCIONES DE DIMENSIONES
# ============================================================

def crear_dim_tiempo(df):
    """
    Dimensi√≥n TIEMPO - CORREGIDA
    Granularidad: 1 registro por A√ëO (no por mes)
    """
    logger.log("Creando DIM_TIEMPO...")
    
    anios = sorted(df['ANIO'].dropna().unique())
    
    dim_tiempo = []
    for anio in anios:
        anio_int = int(anio)
        dim_tiempo.append({
            'id_tiempo': anio_int * 100 + 6,  # Formato: 201506 (junio representativo)
            'anio': anio_int,
            'mes': 6,  # Mes representativo (junio - mitad del a√±o)
            'trimestre': 2,  # Q2
            'semestre': 1,   # S1
            'nombre_mes': 'Junio',
            'quinquenio': f"{(anio_int//5)*5}-{(anio_int//5)*5+4}",
            'periodo_covid': int(anio_int in [2020, 2021])
        })
    
    df_tiempo = pd.DataFrame(dim_tiempo)
    logger.log(f"  ‚úì {len(df_tiempo)} registros creados (1 por a√±o)")
    return df_tiempo


def crear_dim_geografia(df):
    """
    Dimensi√≥n GEOGRAF√çA
    Combina: departamento + √°rea + altitud
    """
    logger.log("Creando DIM_GEOGRAFIA...")
    
    # Seleccionar columnas geogr√°ficas
    geo_cols = ['HV024', 'HV025', 'HV040']
    
    # Verificar que existan
    geo_cols_disponibles = [col for col in geo_cols if col in df.columns]
    
    if not geo_cols_disponibles:
        logger.log("  ‚ö†Ô∏è No se encontraron columnas geogr√°ficas")
        return pd.DataFrame()
    
    # Combinaciones √∫nicas
    df_geo = df[geo_cols_disponibles].drop_duplicates().reset_index(drop=True)
    
    # Asignar ID
    df_geo['id_geografia'] = range(1, len(df_geo) + 1)
    
    # Mapeo regi√≥n natural (ajustar seg√∫n tus datos reales)
    mapa_region = {
        'tumbes': 'Costa', 'piura': 'Costa', 'lambayeque': 'Costa', 
        'la libertad': 'Costa', 'ancash': 'Costa', 'lima': 'Costa',
        'ica': 'Costa', 'arequipa': 'Costa', 'moquegua': 'Costa', 'tacna': 'Costa',
        'callao': 'Costa', 'lima metropolitana': 'Costa',
        'cajamarca': 'Sierra', 'huanuco': 'Sierra', 'pasco': 'Sierra',
        'junin': 'Sierra', 'huancavelica': 'Sierra', 'ayacucho': 'Sierra',
        'apurimac': 'Sierra', 'cusco': 'Sierra', 'puno': 'Sierra',
        'loreto': 'Selva', 'amazonas': 'Selva', 'san martin': 'Selva',
        'ucayali': 'Selva', 'madre de dios': 'Selva'
    }
    
    if 'HV024' in df_geo.columns:
        df_geo['region_natural'] = df_geo['HV024'].str.lower().str.strip().map(mapa_region)
        df_geo['region_natural'] = df_geo['region_natural'].fillna('Otro')
    
    # Categorizar altitud
    if 'HV040' in df_geo.columns:
        df_geo['rango_altitud'] = pd.cut(
            df_geo['HV040'],
            bins=[-1, 500, 1500, 2500, 5000],
            labels=['<500m', '500-1500m', '1500-2500m', '>2500m']
        ).astype(str)
    
    # Renombrar columnas
    rename_map = {
        'HV024': 'departamento',
        'HV025': 'area_residencia',
        'HV040': 'altitud_msnm'
    }
    df_geo.rename(columns={k: v for k, v in rename_map.items() if k in df_geo.columns}, 
                  inplace=True)
    
    # Ordenar columnas
    columnas_ordenadas = ['id_geografia', 'departamento', 'region_natural', 
                          'area_residencia', 'altitud_msnm', 'rango_altitud']
    columnas_disponibles = [col for col in columnas_ordenadas if col in df_geo.columns]
    df_geo = df_geo[columnas_disponibles]
    
    logger.log(f"  ‚úì {len(df_geo)} registros creados")
    return df_geo


def crear_dim_hogar(df):
    """
    Dimensi√≥n HOGAR
    Granularidad: HHID √∫nico
    """
    logger.log("Creando DIM_HOGAR...")
    
    # Columnas de hogar
    hogar_cols = ['HHID', 'HV009', 'HV271', 'V190', 
                  'HV206', 'HV201', 'HV205', 'HV237']
    
    # Verificar columnas disponibles
    hogar_cols_disponibles = [col for col in hogar_cols if col in df.columns]
    
    if 'HHID' not in hogar_cols_disponibles:
        logger.log("  ‚ö†Ô∏è Columna HHID no encontrada")
        return pd.DataFrame()
    
    # Extraer hogares √∫nicos
    df_hogar = df[hogar_cols_disponibles].drop_duplicates(subset='HHID').reset_index(drop=True)
    
    # Asignar ID
    df_hogar['id_hogar'] = range(1, len(df_hogar) + 1)
    
    # Crear categor√≠as si las columnas existen
    if 'HV009' in df_hogar.columns:
        df_hogar['categoria_tamano'] = pd.cut(
            df_hogar['HV009'],
            bins=[0, 3, 5, 100],
            labels=['Peque√±o', 'Mediano', 'Grande']
        ).astype(str)
    
    # Renombrar columnas
    rename_map = {
        'HV009': 'num_miembros',
        'HV271': 'indice_riqueza_num',
        'V190': 'quintil_riqueza',
        'HV206': 'tiene_electricidad',
        'HV201': 'fuente_agua',
        'HV205': 'tipo_saneamiento',
        'HV237': 'trata_agua'
    }
    
    df_hogar.rename(columns={k: v for k, v in rename_map.items() if k in df_hogar.columns}, 
                    inplace=True)
    
    # Mover id_hogar y HHID al inicio
    cols = ['id_hogar', 'HHID'] + [col for col in df_hogar.columns if col not in ['id_hogar', 'HHID']]
    df_hogar = df_hogar[cols]
    
    logger.log(f"  ‚úì {len(df_hogar)} registros creados")
    return df_hogar


def crear_dim_madre(df):
    """
    Dimensi√≥n MADRE
    Granularidad: CASEID √∫nico
    """
    logger.log("Creando DIM_MADRE...")
    
    # Columnas de madre
    madre_cols = ['CASEID', 'V012', 'V106', 'V133', 'V025']
    madre_cols_disponibles = [col for col in madre_cols if col in df.columns]
    
    if 'CASEID' not in madre_cols_disponibles:
        logger.log("  ‚ö†Ô∏è Columna CASEID no encontrada")
        return pd.DataFrame()
    
    # Extraer madres √∫nicas
    df_madre = df[madre_cols_disponibles].drop_duplicates(subset='CASEID').reset_index(drop=True)
    
    # Asignar ID
    df_madre['id_madre'] = range(1, len(df_madre) + 1)
    
    # Crear categor√≠as si existen las columnas
    if 'V012' in df_madre.columns:
        df_madre['rango_edad'] = pd.cut(
            df_madre['V012'],
            bins=[0, 20, 35, 100],
            labels=['<20', '20-34', '35+']
        ).astype(str)
        
        df_madre['es_madre_adolescente'] = (df_madre['V012'] < 20).astype(int)
    
    if 'V133' in df_madre.columns:
        df_madre['categoria_educacion'] = pd.cut(
            df_madre['V133'],
            bins=[-1, 6, 12, 100],
            labels=['Baja', 'Media', 'Alta']
        ).astype(str)
    
    # Renombrar
    rename_map = {
        'V012': 'edad_actual',
        'V106': 'nivel_educativo',
        'V133': 'anios_educacion',
        'V025': 'area_residencia_madre'
    }
    
    df_madre.rename(columns={k: v for k, v in rename_map.items() if k in df_madre.columns}, 
                    inplace=True)
    
    # Reordenar
    cols = ['id_madre', 'CASEID'] + [col for col in df_madre.columns if col not in ['id_madre', 'CASEID']]
    df_madre = df_madre[cols]
    
    logger.log(f"  ‚úì {len(df_madre)} registros creados")
    return df_madre


def crear_dim_nino(df):
    """
    Dimensi√≥n NI√ëO
    Granularidad: HHID + HC0 (cada ni√±o)
    """
    logger.log("Creando DIM_NINO...")
    
    # Cada registro es un ni√±o √∫nico
    nino_cols = ['HHID', 'HC0', 'HC1', 'HC27', 'BORD', 'HC70', 'HW71']
    nino_cols_disponibles = [col for col in nino_cols if col in df.columns]
    
    if 'HHID' not in nino_cols_disponibles or 'HC0' not in nino_cols_disponibles:
        logger.log("  ‚ö†Ô∏è Columnas HHID o HC0 no encontradas")
        return pd.DataFrame()
    
    df_nino = df[nino_cols_disponibles].copy()
    
    # Crear ID √∫nico
    df_nino['id_nino'] = range(1, len(df_nino) + 1)
    
    # Crear categor√≠as
    if 'HC1' in df_nino.columns:
        df_nino['rango_edad'] = pd.cut(
            df_nino['HC1'],
            bins=[5, 12, 18, 24, 36],
            labels=['6-11m', '12-17m', '18-23m', '24-35m'],
            right=False
        ).astype(str)
    
    if 'HC70' in df_nino.columns:
        df_nino['tiene_desnutricion_cronica'] = (df_nino['HC70'] < -2).astype(int)
    
    if 'HW71' in df_nino.columns:
        df_nino['tiene_bajo_peso'] = (df_nino['HW71'] < -2).astype(int)
    
    if 'BORD' in df_nino.columns:
        df_nino['categoria_orden'] = pd.cut(
            df_nino['BORD'],
            bins=[0, 1, 3, 100],
            labels=['Primog√©nito', '2-3', '4+']
        ).astype(str)
    
    # Renombrar
    rename_map = {
        'HC1': 'edad_meses',
        'HC27': 'sexo',
        'BORD': 'orden_nacimiento'
    }
    
    df_nino.rename(columns={k: v for k, v in rename_map.items() if k in df_nino.columns}, 
                   inplace=True)
    
    # Reordenar
    cols = ['id_nino', 'HHID', 'HC0'] + [col for col in df_nino.columns 
                                          if col not in ['id_nino', 'HHID', 'HC0']]
    df_nino = df_nino[cols]
    
    logger.log(f"  ‚úì {len(df_nino)} registros creados")
    return df_nino


def crear_fact_anemia(df, dim_tiempo, dim_geo, dim_hogar, dim_madre, dim_nino):
    """
    Tabla de HECHOS - FACT_ANEMIA
    CORREGIDO: Mapeo 1:1 con dim_tiempo (1 registro por a√±o)
    """
    logger.log("Creando FACT_ANEMIA...")
    
    fact = df.copy()
    
    # 1. Mapear FK - TIEMPO (CORREGIDO: asignar a junio de cada a√±o)
    if 'ANIO' in fact.columns:
        fact['id_tiempo'] = fact['ANIO'].astype(int) * 100 + 6
    else:
        logger.log("  ‚ö†Ô∏è Columna ANIO no encontrada")
        return pd.DataFrame()
    
    # 2. Mapear FK - GEOGRAFIA
    if not dim_geo.empty and 'HV024' in fact.columns:
        merge_cols = [col for col in ['HV024', 'HV025', 'HV040'] if col in fact.columns]
        dim_merge_cols = [col.replace('HV024', 'departamento')
                          .replace('HV025', 'area_residencia')
                          .replace('HV040', 'altitud_msnm') for col in merge_cols]
        
        fact = fact.merge(
            dim_geo[['id_geografia'] + dim_merge_cols],
            left_on=merge_cols,
            right_on=dim_merge_cols,
            how='left'
        )
        
        # Eliminar columnas duplicadas del merge
        for col in dim_merge_cols:
            if col in fact.columns and col not in ['id_geografia']:
                fact.drop(columns=[col], inplace=True)
    
    # 3. Mapear FK - HOGAR
    if not dim_hogar.empty and 'HHID' in fact.columns:
        fact = fact.merge(
            dim_hogar[['id_hogar', 'HHID']],
            on='HHID',
            how='left'
        )
    
    # 4. Mapear FK - MADRE
    if not dim_madre.empty and 'CASEID' in fact.columns:
        fact = fact.merge(
            dim_madre[['id_madre', 'CASEID']],
            on='CASEID',
            how='left'
        )
    
    # 5. Mapear FK - NINO
    if not dim_nino.empty and 'HHID' in fact.columns and 'HC0' in fact.columns:
        fact = fact.merge(
            dim_nino[['id_nino', 'HHID', 'HC0']],
            on=['HHID', 'HC0'],
            how='left',
            suffixes=('', '_nino')
        )
    
    # 6. Seleccionar columnas de FACT
    fact_cols = [
        # FK
        'id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre',
        
        # IDs originales (trazabilidad)
        'HHID', 'CASEID', 'HC0',
        
        # M√©tricas antropom√©tricas
        'HW2', 'HW3', 'HC70', 'HW70', 'HW71', 'HW72', 'HW73',
        
        # Target
        'ANEMIA', 'HC57',
        
        # Pesos
        'PESO',
        
        # Flags de calidad
        'HC55', 'HV015', 'HV103'
    ]
    
    # Filtrar solo columnas existentes
    fact_cols_disponibles = [col for col in fact_cols if col in fact.columns]
    fact = fact[fact_cols_disponibles]
    
    # 7. Renombrar columnas
    rename_map = {
        'HW2': 'peso_kg',
        'HW3': 'talla_cm',
        'HC70': 'z_talla_edad',
        'HW70': 'z_talla_edad_alt',
        'HW71': 'z_peso_edad',
        'HW72': 'z_peso_talla',
        'HW73': 'z_imc',
        'ANEMIA': 'tiene_anemia',
        'HC57': 'nivel_anemia',
        'PESO': 'peso_muestral',
        'HC55': 'medicion_valida',
        'HV015': 'cuestionario_ok',
        'HV103': 'durmio_anoche'
    }
    
    fact.rename(columns={k: v for k, v in rename_map.items() if k in fact.columns}, 
                inplace=True)
    
    logger.log(f"  ‚úì {len(fact)} registros creados")
    
    return fact

# ============================================================
# VALIDACIONES
# ============================================================

def validar_integridad_referencial(fact, dims):
    """Validar FK en tabla de hechos"""
    logger.log("\nValidando integridad referencial...")
    
    errores = []
    
    # Verificar cada FK
    checks = [
        ('id_tiempo', dims['dim_tiempo'], 'id_tiempo'),
        ('id_geografia', dims['dim_geografia'], 'id_geografia'),
        ('id_hogar', dims['dim_hogar'], 'id_hogar'),
        ('id_madre', dims['dim_madre'], 'id_madre'),
        ('id_nino', dims['dim_nino'], 'id_nino')
    ]
    
    for fk_col, dim_df, dim_pk in checks:
        if fk_col in fact.columns and not dim_df.empty:
            invalidos = ~fact[fk_col].isin(dim_df[dim_pk])
            n_invalidos = invalidos.sum()
            
            if n_invalidos > 0:
                errores.append(f"  ‚ùå {fk_col}: {n_invalidos} FK inv√°lidos")
            else:
                logger.log(f"  ‚úì {fk_col}: OK")
    
    if errores:
        logger.log("\n‚ö†Ô∏è  ERRORES DE INTEGRIDAD ENCONTRADOS:")
        for error in errores:
            logger.log(error)
        raise ValueError("Integridad referencial violada")
    else:
        logger.log("  ‚úÖ Integridad referencial OK")


def validar_calidad_datos(fact, dims):
    """Validar calidad de datos"""
    logger.log("\nValidando calidad de datos...")
    
    # 1. Registros en fact
    logger.log(f"  Total registros FACT: {len(fact):,}")
    
    # 2. Duplicados
    if 'id_nino' in fact.columns and 'id_tiempo' in fact.columns:
        duplicados = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
        if duplicados > 0:
            logger.log(f"  ‚ö†Ô∏è  Duplicados encontrados: {duplicados}")
        else:
            logger.log(f"  ‚úì Sin duplicados")
    
    # 3. Missings en FK
    for col in ['id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre']:
        if col in fact.columns:
            missing = fact[col].isna().sum()
            if missing > 0:
                logger.log(f"  ‚ö†Ô∏è  {col}: {missing} missings ({missing/len(fact)*100:.1f}%)")
    
    # 4. Estad√≠sticas de dimensiones
    logger.log(f"\nDimensiones:")
    logger.log(f"  DIM_TIEMPO:    {len(dims['dim_tiempo']):>6,} registros")
    logger.log(f"  DIM_GEOGRAFIA: {len(dims['dim_geografia']):>6,} registros")
    logger.log(f"  DIM_HOGAR:     {len(dims['dim_hogar']):>6,} registros")
    logger.log(f"  DIM_MADRE:     {len(dims['dim_madre']):>6,} registros")
    logger.log(f"  DIM_NINO:      {len(dims['dim_nino']):>6,} registros")
    
    logger.log("\n  ‚úÖ Validaci√≥n de calidad completada")


# ============================================================
# GUARDAR EN SQLITE
# ============================================================

def crear_esquema_sqlite(conn):
    """Crear estructura de tablas con DDL"""
    logger.log("\nCreando esquema SQLite...")
    
    cursor = conn.cursor()
    
    # DDL para cada tabla (simplificado - SQLite infiere tipos)
    cursor.execute("DROP TABLE IF EXISTS fact_anemia")
    cursor.execute("DROP TABLE IF EXISTS dim_tiempo")
    cursor.execute("DROP TABLE IF EXISTS dim_geografia")
    cursor.execute("DROP TABLE IF EXISTS dim_hogar")
    cursor.execute("DROP TABLE IF EXISTS dim_madre")
    cursor.execute("DROP TABLE IF EXISTS dim_nino")
    
    conn.commit()
    logger.log("  ‚úì Tablas anteriores eliminadas (si exist√≠an)")


def guardar_en_sqlite(fact, dims, db_path):
    """Guardar modelo estrella en SQLite"""
    logger.log(f"\nGuardando en SQLite: {db_path}")
    
    conn = sqlite3.connect(db_path)
    
    # Crear esquema
    crear_esquema_sqlite(conn)
    
    # Guardar dimensiones
    logger.log("  Guardando dimensiones...")
    dims['dim_tiempo'].to_sql('dim_tiempo', conn, if_exists='replace', index=False)
    dims['dim_geografia'].to_sql('dim_geografia', conn, if_exists='replace', index=False)
    dims['dim_hogar'].to_sql('dim_hogar', conn, if_exists='replace', index=False)
    dims['dim_madre'].to_sql('dim_madre', conn, if_exists='replace', index=False)
    dims['dim_nino'].to_sql('dim_nino', conn, if_exists='replace', index=False)
    
    # Guardar hechos (en chunks)
    logger.log("  Guardando tabla de hechos...")
    fact.to_sql('fact_anemia', conn, if_exists='replace', 
                index=False, chunksize=config.CHUNK_SIZE)
    
    # Crear √≠ndices
    logger.log("  Creando √≠ndices...")
    cursor = conn.cursor()
    
    indices = [
        "CREATE INDEX idx_fact_tiempo ON fact_anemia(id_tiempo)",
        "CREATE INDEX idx_fact_geografia ON fact_anemia(id_geografia)",
        "CREATE INDEX idx_fact_hogar ON fact_anemia(id_hogar)",
        "CREATE INDEX idx_fact_madre ON fact_anemia(id_madre)",
        "CREATE INDEX idx_fact_nino ON fact_anemia(id_nino)",
        "CREATE INDEX idx_fact_anemia ON fact_anemia(tiene_anemia)",
        "CREATE INDEX idx_dim_geo_dept ON dim_geografia(departamento)",
        "CREATE INDEX idx_dim_tiempo_anio ON dim_tiempo(anio)"
    ]
    
    for idx_sql in indices:
        try:
            cursor.execute(idx_sql)
        except Exception as e:
            logger.log(f"    ‚ö†Ô∏è  Error creando √≠ndice: {e}")
    
    conn.commit()
    conn.close()
    
    logger.log("  ‚úÖ Guardado en SQLite completado")


def guardar_backup_parquet(fact, dims):
    """Backup en Parquet"""
    if not config.BACKUP_PARQUET:
        return
    
    logger.log(f"\nCreando backup Parquet en {config.BACKUP_DIR}...")
    
    fact.to_parquet(config.BACKUP_DIR / "fact_anemia.parquet", index=False)
    dims['dim_tiempo'].to_parquet(config.BACKUP_DIR / "dim_tiempo.parquet", index=False)
    dims['dim_geografia'].to_parquet(config.BACKUP_DIR / "dim_geografia.parquet", index=False)
    dims['dim_hogar'].to_parquet(config.BACKUP_DIR / "dim_hogar.parquet", index=False)
    dims['dim_madre'].to_parquet(config.BACKUP_DIR / "dim_madre.parquet", index=False)
    dims['dim_nino'].to_parquet(config.BACKUP_DIR / "dim_nino.parquet", index=False)
    
    logger.log("  ‚úÖ Backup Parquet completado")


def generar_metadata(fact, dims):
    """Generar metadata del warehouse"""
    logger.log("\nGenerando metadata...")
    
    metadata = {
        'fecha_creacion': datetime.now().isoformat(),
        'version': 'v2.0_corregida',
        'correcciones': 'dim_tiempo con 1 registro por a√±o (no 12 meses)',
        'registros': {
            'fact_anemia': len(fact),
            'dim_tiempo': len(dims['dim_tiempo']),
            'dim_geografia': len(dims['dim_geografia']),
            'dim_hogar': len(dims['dim_hogar']),
            'dim_madre': len(dims['dim_madre']),
            'dim_nino': len(dims['dim_nino'])
        },
        'columnas': {
            'fact_anemia': list(fact.columns),
            'dim_tiempo': list(dims['dim_tiempo'].columns),
            'dim_geografia': list(dims['dim_geografia'].columns),
            'dim_hogar': list(dims['dim_hogar'].columns),
            'dim_madre': list(dims['dim_madre'].columns),
            'dim_nino': list(dims['dim_nino'].columns)
        },
        'prevalencia_anemia': float(fact['tiene_anemia'].mean() * 100) if 'tiene_anemia' in fact.columns else None
    }
    
    # Guardar como JSON
    with open(config.METADATA_DIR / 'estadisticas.json', 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)
    
    logger.log("  ‚úì Metadata guardada")


# ============================================================
# PIPELINE PRINCIPAL
# ============================================================

def main():
    """Pipeline completo ETL"""
    
    logger.log("="*60)
    logger.log("INICIANDO GENERACI√ìN DE MODELO ESTRELLA (VERSI√ìN CORREGIDA)")
    logger.log("="*60)
    
    try:
        # 1. EXTRACT
        logger.log(f"\n1. Cargando CSV: {config.CSV_INPUT}")
        df = pd.read_csv(config.CSV_INPUT, encoding='utf-8-sig')
        logger.log(f"   ‚úì {len(df):,} registros cargados")
        logger.log(f"   ‚úì {df.shape[1]} columnas")
        
        # 2. TRANSFORM - Crear dimensiones
        logger.log("\n2. Creando dimensiones...")
        dim_tiempo = crear_dim_tiempo(df)
        dim_geografia = crear_dim_geografia(df)
        dim_hogar = crear_dim_hogar(df)
        dim_madre = crear_dim_madre(df)
        dim_nino = crear_dim_nino(df)
        
        dims = {
            'dim_tiempo': dim_tiempo,
            'dim_geografia': dim_geografia,
            'dim_hogar': dim_hogar,
            'dim_madre': dim_madre,
            'dim_nino': dim_nino
        }
        
        # 3. TRANSFORM - Crear hechos
        logger.log("\n3. Creando tabla de hechos...")
        fact = crear_fact_anemia(df, dim_tiempo, dim_geografia, 
                                dim_hogar, dim_madre, dim_nino)
        
        # 4. VALIDACIONES
        logger.log("\n4. Validando modelo...")
        validar_integridad_referencial(fact, dims)
        validar_calidad_datos(fact, dims)
        
        # 5. LOAD
        logger.log("\n5. Cargando a Data Warehouse...")
        guardar_en_sqlite(fact, dims, config.DB_OUTPUT)
        guardar_backup_parquet(fact, dims)
        generar_metadata(fact, dims)
        
        # 6. VERIFICACI√ìN FINAL
        logger.log("\n6. Verificaci√≥n final...")
        conn = sqlite3.connect(config.DB_OUTPUT)
        query_verificacion = """
        SELECT 
            t.anio,
            COUNT(*) as casos,
            SUM(f.tiene_anemia) as con_anemia,
            ROUND(AVG(f.tiene_anemia) * 100, 2) as prevalencia
        FROM fact_anemia f
        JOIN dim_tiempo t ON f.id_tiempo = t.id_tiempo
        WHERE f.tiene_anemia IS NOT NULL
        GROUP BY t.anio
        ORDER BY t.anio
        """
        df_verificacion = pd.read_sql(query_verificacion, conn)
        conn.close()
        
        logger.log("\nüìä Casos por a√±o (verificaci√≥n):")
        logger.log(df_verificacion.to_string(index=False))
        
        # FIN
        logger.log("\n" + "="*60)
        logger.log("‚úÖ MODELO ESTRELLA GENERADO EXITOSAMENTE")
        logger.log("="*60)
        logger.log(f"\nüìÅ Archivos generados:")
        logger.log(f"   ‚Ä¢ SQLite:  {config.DB_OUTPUT}")
        logger.log(f"   ‚Ä¢ Metadata: {config.METADATA_DIR}")
        logger.log(f"   ‚Ä¢ Backup:   {config.BACKUP_DIR}")
        
        # Estad√≠sticas finales
        tamanio_db = Path(config.DB_OUTPUT).stat().st_size / (1024*1024)
        logger.log(f"\nüìä Estad√≠sticas:")
        logger.log(f"   ‚Ä¢ Tama√±o DB: {tamanio_db:.2f} MB")
        logger.log(f"   ‚Ä¢ Total registros FACT: {len(fact):,}")
        if 'tiene_anemia' in fact.columns:
            logger.log(f"   ‚Ä¢ Prevalencia anemia: {fact['tiene_anemia'].mean()*100:.2f}%")
        logger.log(f"   ‚Ä¢ A√±os en dim_tiempo: {len(dim_tiempo)}")
        
    except Exception as e:
        logger.log(f"\n‚ùå ERROR: {str(e)}")
        import traceback
        logger.log(traceback.format_exc())
        raise
    
    finally:
        logger.finalizar()


if __name__ == "__main__":
    main()

[2025-11-04 01:32:11] INICIANDO GENERACI√ìN DE MODELO ESTRELLA (VERSI√ìN CORREGIDA)
[2025-11-04 01:32:11] 
1. Cargando CSV: D:\Bases_train_test\endes_2015_2024_consolidado.csv
[2025-11-04 01:32:11]    ‚úì 131,470 registros cargados
[2025-11-04 01:32:11]    ‚úì 42 columnas
[2025-11-04 01:32:11] 
2. Creando dimensiones...
[2025-11-04 01:32:11] Creando DIM_TIEMPO...
[2025-11-04 01:32:11]   ‚úì 10 registros creados (1 por a√±o)
[2025-11-04 01:32:11] Creando DIM_GEOGRAFIA...
[2025-11-04 01:32:12]   ‚úì 8090 registros creados
[2025-11-04 01:32:12] Creando DIM_HOGAR...
[2025-11-04 01:32:12]   ‚úì 94921 registros creados
[2025-11-04 01:32:12] Creando DIM_MADRE...
[2025-11-04 01:32:12]   ‚úì 99928 registros creados
[2025-11-04 01:32:12] Creando DIM_NINO...
[2025-11-04 01:32:12]   ‚úì 131470 registros creados
[2025-11-04 01:32:12] 
3. Creando tabla de hechos...
[2025-11-04 01:32:12] Creando FACT_ANEMIA...
[2025-11-04 01:32:12]   ‚úì 189846 registros creados
[2025-11-04 01:32:12] 
4. Validando mo

In [13]:
"""
GENERADOR DE MODELO ESTRELLA - ANEMIA INFANTIL ENDES
Convierte CSV consolidado ‚Üí SQLite con esquema estrella
Autor: [Tu nombre]
Fecha: 2025
"""

import pandas as pd
import numpy as np
import sqlite3
from pathlib import Path
from datetime import datetime
import json
import warnings
warnings.filterwarnings('ignore')

# ============================================================
# CONFIGURACI√ìN
# ============================================================
class Config:
    # Rutas
    CSV_INPUT = r"D:\Bases_train_test\endes_2015_2024_consolidado.csv"
    DB_OUTPUT = r"D:\Data_Warehouse\anemia_dwh.db"
    METADATA_DIR = Path(r"D:\Data_Warehouse\metadata")
    BACKUP_DIR = Path(r"D:\Data_Warehouse\backup")
    
    # Crear directorios si no existen
    METADATA_DIR.mkdir(parents=True, exist_ok=True)
    BACKUP_DIR.mkdir(parents=True, exist_ok=True)
    
    # Configuraci√≥n de procesamiento
    CHUNK_SIZE = 5000  # Para inserci√≥n en SQLite
    BACKUP_PARQUET = True  # Crear backup en parquet

config = Config()

# ============================================================
# UTILIDADES
# ============================================================
class Logger:
    """Logger simple para ETL"""
    def __init__(self, log_file):
        self.log_file = log_file
        self.start_time = datetime.now()
        
    def log(self, mensaje):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        msg = f"[{timestamp}] {mensaje}"
        print(msg)
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(msg + '\n')
    
    def finalizar(self):
        duracion = (datetime.now() - self.start_time).total_seconds()
        self.log(f"\n{'='*60}")
        self.log(f"Proceso completado en {duracion:.2f} segundos")
        self.log(f"{'='*60}")

# Inicializar logger
log_file = config.METADATA_DIR / f"log_etl_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
logger = Logger(log_file)

# ============================================================
# FUNCIONES DE DIMENSIONES
# ============================================================

def crear_dim_tiempo(df):
    """
    Dimensi√≥n TIEMPO CORREGIDA
    Granularidad: 1 registro por a√±o (simplificado para ENDES)
    """
    logger.log("Creando DIM_TIEMPO (CORREGIDA - 1 registro por a√±o)...")
    
    anios = sorted(df['ANIO'].unique())
    
    dim_tiempo = []
    for anio in anios:
        # Solo 1 registro por a√±o (usamos junio como representativo)
        dim_tiempo.append({
            'id_tiempo': int(f"{anio}06"),  # Formato: A√±o + Mes 06 (junio)
            'anio': int(anio),
            'mes': 6,  # Todos en junio
            'trimestre': 2,  # Q2
            'semestre': 1,   # S1
            'nombre_mes': 'Junio',
            'quinquenio': f"{(anio//5)*5}-{(anio//5)*5+4}",
            'periodo_covid': int(anio in [2020, 2021])
        })
    
    df_tiempo = pd.DataFrame(dim_tiempo)
    logger.log(f"  ‚úì {len(df_tiempo)} registros creados (1 por a√±o)")
    return df_tiempo


def crear_dim_geografia(df):
    """
    Dimensi√≥n GEOGRAF√çA
    Combina: departamento + √°rea + altitud
    """
    logger.log("Creando DIM_GEOGRAFIA...")
    
    # Seleccionar columnas geogr√°ficas
    geo_cols = ['HV024', 'HV025', 'HV040']
    
    # Combinaciones √∫nicas
    df_geo = df[geo_cols].drop_duplicates().reset_index(drop=True)
    
    # Asignar ID
    df_geo['id_geografia'] = range(1, len(df_geo) + 1)
    
    # Mapeo regi√≥n natural (simplificado - ajustar seg√∫n necesidad)
    mapa_region = {
        'tumbes': 'Costa', 'piura': 'Costa', 'lambayeque': 'Costa', 
        'la libertad': 'Costa', 'ancash': 'Costa', 'lima': 'Costa',
        'ica': 'Costa', 'arequipa': 'Costa', 'moquegua': 'Costa', 'tacna': 'Costa',
        'cajamarca': 'Sierra', 'huanuco': 'Sierra', 'pasco': 'Sierra',
        'junin': 'Sierra', 'huancavelica': 'Sierra', 'ayacucho': 'Sierra',
        'apurimac': 'Sierra', 'cusco': 'Sierra', 'puno': 'Sierra',
        'loreto': 'Selva', 'amazonas': 'Selva', 'san martin': 'Selva',
        'ucayali': 'Selva', 'madre de dios': 'Selva'
    }
    
    df_geo['region_natural'] = df_geo['HV024'].str.lower().map(mapa_region)
    df_geo['region_natural'] = df_geo['region_natural'].fillna('Otro')
    
    # Categorizar altitud
    df_geo['rango_altitud'] = pd.cut(
        df_geo['HV040'],
        bins=[-1, 500, 1500, 2500, 5000],
        labels=['<500m', '500-1500m', '1500-2500m', '>2500m']
    ).astype(str)
    
    # Renombrar columnas
    df_geo.rename(columns={
        'HV024': 'departamento',
        'HV025': 'area_residencia',
        'HV040': 'altitud_msnm'
    }, inplace=True)
    
    # Ordenar columnas
    df_geo = df_geo[['id_geografia', 'departamento', 'region_natural', 
                     'area_residencia', 'altitud_msnm', 'rango_altitud']]
    
    logger.log(f"  ‚úì {len(df_geo)} registros creados")
    return df_geo


def crear_dim_hogar(df):
    """
    Dimensi√≥n HOGAR
    Granularidad: HHID √∫nico
    """
    logger.log("Creando DIM_HOGAR...")
    
    # Columnas de hogar
    hogar_cols = ['HHID', 'HV009', 'HV271', 'V190', 
                  'HV206', 'HV201', 'HV205', 'HV237']
    
    # Verificar columnas disponibles
    hogar_cols_disponibles = [col for col in hogar_cols if col in df.columns]
    
    # Extraer hogares √∫nicos
    df_hogar = df[hogar_cols_disponibles].drop_duplicates(subset='HHID').reset_index(drop=True)
    
    # Asignar ID
    df_hogar['id_hogar'] = range(1, len(df_hogar) + 1)
    
    # Crear categor√≠as si las columnas existen
    if 'HV009' in df_hogar.columns:
        df_hogar['categoria_tamano'] = pd.cut(
            df_hogar['HV009'],
            bins=[0, 3, 5, 100],
            labels=['Peque√±o', 'Mediano', 'Grande']
        ).astype(str)
    
    # Renombrar columnas
    rename_map = {
        'HV009': 'num_miembros',
        'HV271': 'indice_riqueza_num',
        'V190': 'quintil_riqueza',
        'HV206': 'tiene_electricidad',
        'HV201': 'fuente_agua',
        'HV205': 'tipo_saneamiento',
        'HV237': 'trata_agua'
    }
    
    df_hogar.rename(columns={k: v for k, v in rename_map.items() if k in df_hogar.columns}, 
                    inplace=True)
    
    # Mover id_hogar y HHID al inicio
    cols = ['id_hogar', 'HHID'] + [col for col in df_hogar.columns if col not in ['id_hogar', 'HHID']]
    df_hogar = df_hogar[cols]
    
    logger.log(f"  ‚úì {len(df_hogar)} registros creados")
    return df_hogar


def crear_dim_madre(df):
    """
    Dimensi√≥n MADRE
    Granularidad: CASEID √∫nico
    """
    logger.log("Creando DIM_MADRE...")
    
    # Columnas de madre
    madre_cols = ['CASEID', 'V012', 'V106', 'V133', 'V025']
    madre_cols_disponibles = [col for col in madre_cols if col in df.columns]
    
    # Extraer madres √∫nicas
    df_madre = df[madre_cols_disponibles].drop_duplicates(subset='CASEID').reset_index(drop=True)
    
    # Asignar ID
    df_madre['id_madre'] = range(1, len(df_madre) + 1)
    
    # Crear categor√≠as si existen las columnas
    if 'V012' in df_madre.columns:
        df_madre['rango_edad'] = pd.cut(
            df_madre['V012'],
            bins=[0, 20, 35, 100],
            labels=['<20', '20-34', '35+']
        ).astype(str)
        
        df_madre['es_madre_adolescente'] = (df_madre['V012'] < 20).astype(int)
    
    if 'V133' in df_madre.columns:
        df_madre['categoria_educacion'] = pd.cut(
            df_madre['V133'],
            bins=[-1, 6, 12, 100],
            labels=['Baja', 'Media', 'Alta']
        ).astype(str)
    
    # Renombrar
    rename_map = {
        'V012': 'edad_actual',
        'V106': 'nivel_educativo',
        'V133': 'anios_educacion',
        'V025': 'area_residencia_madre'
    }
    
    df_madre.rename(columns={k: v for k, v in rename_map.items() if k in df_madre.columns}, 
                    inplace=True)
    
    # Reordenar
    cols = ['id_madre', 'CASEID'] + [col for col in df_madre.columns if col not in ['id_madre', 'CASEID']]
    df_madre = df_madre[cols]
    
    logger.log(f"  ‚úì {len(df_madre)} registros creados")
    return df_madre


def crear_dim_nino(df):
    """
    Dimensi√≥n NI√ëO CORREGIDA (manejo de duplicados)
    Granularidad: HHID + HC0 (cada ni√±o)
    """
    logger.log("Creando DIM_NINO...")
    
    # Cada registro es un ni√±o √∫nico
    nino_cols = ['HHID', 'HC0', 'HC1', 'HC27', 'BORD', 'HC70', 'HW71']
    nino_cols_disponibles = [col for col in nino_cols if col in df.columns]
    
    # INVESTIGAR DUPLICADOS
    duplicados = df.duplicated(subset=['HHID', 'HC0']).sum()
    if duplicados > 0:
        logger.log(f"  ‚ö†Ô∏è  Encontrados {duplicados} duplicados en HHID+HC0")
        
        # Analizar los duplicados
        dup_info = df[df.duplicated(subset=['HHID', 'HC0'], keep=False)]
        logger.log(f"  ‚ö†Ô∏è  Total registros duplicados (incluyendo originales): {len(dup_info)}")
        
        # Ver diferencias en columnas clave
        columnas_clave = ['HC1', 'HC27', 'BORD', 'ANEMIA']
        columnas_disponibles = [col for col in columnas_clave if col in dup_info.columns]
        
        if columnas_disponibles:
            dup_sample = dup_info.groupby(['HHID', 'HC0'])[columnas_disponibles].nunique()
            conflictos = (dup_sample > 1).any(axis=1).sum()
            logger.log(f"  ‚ö†Ô∏è  {conflictos} grupos con datos conflictivos")
    
    # ELIMINAR DUPLICADOS - mantener el primer registro
    df_nino = df[nino_cols_disponibles].drop_duplicates(subset=['HHID', 'HC0'], keep='first').copy()
    
    # Crear ID √∫nico
    df_nino['id_nino'] = range(1, len(df_nino) + 1)
    
    # Crear categor√≠as
    if 'HC1' in df_nino.columns:
        df_nino['rango_edad'] = pd.cut(
            df_nino['HC1'],
            bins=[5, 12, 18, 24, 36],
            labels=['6-11m', '12-17m', '18-23m', '24-35m'],
            right=False
        ).astype(str)
    
    if 'HC70' in df_nino.columns:
        df_nino['tiene_desnutricion_cronica'] = (df_nino['HC70'] < -2).astype(int)
    
    if 'HW71' in df_nino.columns:
        df_nino['tiene_bajo_peso'] = (df_nino['HW71'] < -2).astype(int)
    
    if 'BORD' in df_nino.columns:
        df_nino['categoria_orden'] = pd.cut(
            df_nino['BORD'],
            bins=[0, 1, 3, 100],
            labels=['Primog√©nito', '2-3', '4+']
        ).astype(str)
    
    # Renombrar
    rename_map = {
        'HC1': 'edad_meses',
        'HC27': 'sexo',
        'BORD': 'orden_nacimiento'
    }
    
    df_nino.rename(columns={k: v for k, v in rename_map.items() if k in df_nino.columns}, 
                   inplace=True)
    
    # Reordenar
    cols = ['id_nino', 'HHID', 'HC0'] + [col for col in df_nino.columns 
                                          if col not in ['id_nino', 'HHID', 'HC0']]
    df_nino = df_nino[cols]
    
    logger.log(f"  ‚úì {len(df_nino)} registros creados (duplicados eliminados)")
    return df_nino


def crear_fact_anemia(df, dim_tiempo, dim_geo, dim_hogar, dim_madre, dim_nino):
    """
    Tabla de HECHOS - FACT_ANEMIA (CORREGIDA - MANEJO DE DUPLICADOS)
    """
    logger.log("Creando FACT_ANEMIA...")
    
    fact = df.copy()
    
    # 1. Mapear FK - TIEMPO CORREGIDO
    fact['id_tiempo'] = fact['ANIO'].astype(int) * 100 + 6
    
    # 2. ELIMINAR DUPLICADOS DEL DATASET ORIGINAL PRIMERO
    logger.log("  Eliminando duplicados del dataset original...")
    registros_antes = len(fact)
    fact = fact.drop_duplicates(subset=['HHID', 'HC0', 'ANIO'], keep='first')
    registros_despues = len(fact)
    
    if registros_antes != registros_despues:
        logger.log(f"  ‚ö†Ô∏è  Eliminados {registros_antes - registros_despues} duplicados de HHID+HC0+ANIO")
    
    # 3. Mapear FK - GEOGRAFIA
    logger.log("  Mapeando geograf√≠a...")
    fact = fact.merge(
        dim_geo[['id_geografia', 'departamento', 'area_residencia', 'altitud_msnm']],
        left_on=['HV024', 'HV025', 'HV040'],
        right_on=['departamento', 'area_residencia', 'altitud_msnm'],
        how='left'
    ).drop(columns=['departamento', 'area_residencia', 'altitud_msnm'])
    
    # 4. Mapear FK - HOGAR
    logger.log("  Mapeando hogar...")
    fact = fact.merge(
        dim_hogar[['id_hogar', 'HHID']],
        on='HHID',
        how='left'
    )
    
    # 5. Mapear FK - MADRE
    logger.log("  Mapeando madre...")
    fact = fact.merge(
        dim_madre[['id_madre', 'CASEID']],
        on='CASEID',
        how='left'
    )
    
    # 6. Mapear FK - NINO
    logger.log("  Mapeando ni√±o...")
    fact = fact.merge(
        dim_nino[['id_nino', 'HHID', 'HC0']],
        on=['HHID', 'HC0'],
        how='left'
    )
    
    # 7. Verificar duplicados finales
    logger.log("  Verificando duplicados finales...")
    duplicados_finales = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
    if duplicados_finales > 0:
        logger.log(f"  ‚ö†Ô∏è  Eliminando {duplicados_finales} duplicados finales...")
        fact = fact.drop_duplicates(subset=['id_nino', 'id_tiempo'], keep='first')
    
    # 8. Seleccionar columnas de FACT
    fact_cols = [
        # FK
        'id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre',
        
        # IDs originales (trazabilidad)
        'HHID', 'CASEID', 'HC0',
        
        # M√©tricas antropom√©tricas
        'HW2', 'HW3', 'HC70', 'HW70', 'HW71', 'HW72', 'HW73',
        
        # Target
        'ANEMIA', 'HC57',
        
        # Pesos
        'PESO',
        
        # Flags de calidad
        'HC55', 'HV015', 'HV103'
    ]
    
    # Filtrar solo columnas existentes
    fact_cols_disponibles = [col for col in fact_cols if col in fact.columns]
    fact = fact[fact_cols_disponibles]
    
    # 9. Renombrar columnas
    rename_map = {
        'HW2': 'peso_kg',
        'HW3': 'talla_cm',
        'HC70': 'z_talla_edad',
        'HW70': 'z_talla_edad_alt',
        'HW71': 'z_peso_edad',
        'HW72': 'z_peso_talla',
        'HW73': 'z_imc',
        'ANEMIA': 'tiene_anemia',
        'HC57': 'nivel_anemia',
        'PESO': 'peso_muestral',
        'HC55': 'medicion_valida',
        'HV015': 'cuestionario_ok',
        'HV103': 'durmio_anoche'
    }
    
    fact.rename(columns={k: v for k, v in rename_map.items() if k in fact.columns}, 
                inplace=True)
    
    logger.log(f"  ‚úì {len(fact)} registros creados (sin duplicados)")
    
    return fact


def crear_fact_anemia(df, dim_tiempo, dim_geo, dim_hogar, dim_madre, dim_nino):
    """
    Tabla de HECHOS - FACT_ANEMIA (CORREGIDA - ELIMINACI√ìN COMPLETA DE DUPLICADOS)
    """
    logger.log("Creando FACT_ANEMIA...")
    
    # HACER UNA COPIA Y ELIMINAR DUPLICADOS INMEDIATAMENTE
    fact = df.copy()
    
    # 1. ELIMINAR TODOS LOS DUPLICADOS DEL DATASET ORIGINAL PRIMERO
    logger.log("  Eliminando duplicados del dataset original...")
    registros_antes = len(fact)
    
    # Identificar la clave √∫nica para cada ni√±o por a√±o
    fact = fact.drop_duplicates(subset=['HHID', 'HC0', 'ANIO'], keep='first')
    
    registros_despues = len(fact)
    duplicados_eliminados = registros_antes - registros_despues
    
    if duplicados_eliminados > 0:
        logger.log(f"  ‚ö†Ô∏è  Eliminados {duplicados_eliminados} duplicados de HHID+HC0+ANIO")
        logger.log(f"  ‚ö†Ô∏è  Registros antes: {registros_antes:,}, despu√©s: {registros_despues:,}")
    
    # 2. Mapear FK - TIEMPO CORREGIDO
    fact['id_tiempo'] = fact['ANIO'].astype(int) * 100 + 6
    
    # 3. Mapear FK - GEOGRAFIA (sin validaci√≥n estricta)
    logger.log("  Mapeando geograf√≠a...")
    fact = fact.merge(
        dim_geo[['id_geografia', 'departamento', 'area_residencia', 'altitud_msnm']],
        left_on=['HV024', 'HV025', 'HV040'],
        right_on=['departamento', 'area_residencia', 'altitud_msnm'],
        how='left'
    ).drop(columns=['departamento', 'area_residencia', 'altitud_msnm'])
    
    # 4. Mapear FK - HOGAR (sin validaci√≥n estricta)
    logger.log("  Mapeando hogar...")
    fact = fact.merge(
        dim_hogar[['id_hogar', 'HHID']],
        on='HHID',
        how='left'
    )
    
    # 5. Mapear FK - MADRE (sin validaci√≥n estricta)
    logger.log("  Mapeando madre...")
    fact = fact.merge(
        dim_madre[['id_madre', 'CASEID']],
        on='CASEID',
        how='left'
    )
    
    # 6. Mapear FK - NINO (SIN VALIDACI√ìN - manejaremos duplicados despu√©s)
    logger.log("  Mapeando ni√±o...")
    fact = fact.merge(
        dim_nino[['id_nino', 'HHID', 'HC0']],
        on=['HHID', 'HC0'],
        how='left'
    )
    
    # 7. VERIFICAR Y ELIMINAR CUALQUIER DUPLICADO RESTANTE
    logger.log("  Verificando duplicados finales...")
    
    # Verificar duplicados por la clave natural del hecho
    duplicados_natural = fact.duplicated(subset=['HHID', 'HC0', 'id_tiempo']).sum()
    if duplicados_natural > 0:
        logger.log(f"  ‚ö†Ô∏è  Eliminando {duplicados_natural} duplicados naturales...")
        fact = fact.drop_duplicates(subset=['HHID', 'HC0', 'id_tiempo'], keep='first')
    
    # Verificar duplicados por la clave del hecho
    duplicados_fk = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
    if duplicados_fk > 0:
        logger.log(f"  ‚ö†Ô∏è  Eliminando {duplicados_fk} duplicados por FK...")
        fact = fact.drop_duplicates(subset=['id_nino', 'id_tiempo'], keep='first')
    
    # 8. VERIFICAR INTEGRIDAD DE LAS FK
    logger.log("  Verificando integridad de FK...")
    
    # Verificar si hay registros sin FK de ni√±o (esto indicar√≠a problemas serios)
    sin_nino = fact['id_nino'].isna().sum()
    if sin_nino > 0:
        logger.log(f"  ‚ö†Ô∏è  {sin_nino} registros sin FK de ni√±o")
        # Podemos eliminar estos registros o mantenerlos seg√∫n el caso
        # fact = fact.dropna(subset=['id_nino'])
    
    # 9. Seleccionar columnas de FACT
    fact_cols = [
        # FK
        'id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre',
        
        # IDs originales (trazabilidad)
        'HHID', 'CASEID', 'HC0',
        
        # M√©tricas antropom√©tricas
        'HW2', 'HW3', 'HC70', 'HW70', 'HW71', 'HW72', 'HW73',
        
        # Target
        'ANEMIA', 'HC57',
        
        # Pesos
        'PESO',
        
        # Flags de calidad
        'HC55', 'HV015', 'HV103'
    ]
    
    # Filtrar solo columnas existentes
    fact_cols_disponibles = [col for col in fact_cols if col in fact.columns]
    fact = fact[fact_cols_disponibles]
    
    # 10. Renombrar columnas
    rename_map = {
        'HW2': 'peso_kg',
        'HW3': 'talla_cm',
        'HC70': 'z_talla_edad',
        'HW70': 'z_talla_edad_alt',
        'HW71': 'z_peso_edad',
        'HW72': 'z_peso_talla',
        'HW73': 'z_imc',
        'ANEMIA': 'tiene_anemia',
        'HC57': 'nivel_anemia',
        'PESO': 'peso_muestral',
        'HC55': 'medicion_valida',
        'HV015': 'cuestionario_ok',
        'HV103': 'durmio_anoche'
    }
    
    fact.rename(columns={k: v for k, v in rename_map.items() if k in fact.columns}, 
                inplace=True)
    
    logger.log(f"  ‚úì {len(fact)} registros creados (limpio de duplicados)")
    
    return fact

# ============================================================
# VALIDACIONES
# ============================================================

def validar_integridad_referencial(fact, dims):
    """Validar FK en tabla de hechos"""
    logger.log("\nValidando integridad referencial...")
    
    errores = []
    
    # Verificar cada FK
    checks = [
        ('id_tiempo', dims['dim_tiempo'], 'id_tiempo'),
        ('id_geografia', dims['dim_geografia'], 'id_geografia'),
        ('id_hogar', dims['dim_hogar'], 'id_hogar'),
        ('id_madre', dims['dim_madre'], 'id_madre'),
        ('id_nino', dims['dim_nino'], 'id_nino')
    ]
    
    for fk_col, dim_df, dim_pk in checks:
        if fk_col in fact.columns:
            invalidos = ~fact[fk_col].isin(dim_df[dim_pk])
            n_invalidos = invalidos.sum()
            
            if n_invalidos > 0:
                errores.append(f"  ‚ùå {fk_col}: {n_invalidos} FK inv√°lidos")
            else:
                logger.log(f"  ‚úì {fk_col}: OK")
    
    if errores:
        logger.log("\n‚ö†Ô∏è  ERRORES DE INTEGRIDAD ENCONTRADOS:")
        for error in errores:
            logger.log(error)
        raise ValueError("Integridad referencial violada")
    else:
        logger.log("  ‚úÖ Integridad referencial OK")


def validar_calidad_datos(fact, dims):
    """Validar calidad de datos"""
    logger.log("\nValidando calidad de datos...")
    
    # 1. Registros en fact
    logger.log(f"  Total registros FACT: {len(fact):,}")
    
    # 2. Duplicados
    duplicados = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
    if duplicados > 0:
        logger.log(f"  ‚ö†Ô∏è  Duplicados encontrados: {duplicados}")
    else:
        logger.log(f"  ‚úì Sin duplicados")
    
    # 3. Missings en FK
    for col in ['id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre']:
        if col in fact.columns:
            missing = fact[col].isna().sum()
            if missing > 0:
                logger.log(f"  ‚ö†Ô∏è  {col}: {missing} missings ({missing/len(fact)*100:.1f}%)")
    
    # 4. Estad√≠sticas de dimensiones
    logger.log(f"\nDimensiones:")
    logger.log(f"  DIM_TIEMPO:    {len(dims['dim_tiempo']):>6,} registros")
    logger.log(f"  DIM_GEOGRAFIA: {len(dims['dim_geografia']):>6,} registros")
    logger.log(f"  DIM_HOGAR:     {len(dims['dim_hogar']):>6,} registros")
    logger.log(f"  DIM_MADRE:     {len(dims['dim_madre']):>6,} registros")
    logger.log(f"  DIM_NINO:      {len(dims['dim_nino']):>6,} registros")
    
    logger.log("\n  ‚úÖ Validaci√≥n de calidad completada")


# ============================================================
# GUARDAR EN SQLITE
# ============================================================

def crear_esquema_sqlite(conn):
    """Crear estructura de tablas con DDL"""
    logger.log("\nCreando esquema SQLite...")
    
    cursor = conn.cursor()
    
    # DDL para cada tabla (simplificado - SQLite infiere tipos)
    cursor.execute("DROP TABLE IF EXISTS fact_anemia")
    cursor.execute("DROP TABLE IF EXISTS dim_tiempo")
    cursor.execute("DROP TABLE IF EXISTS dim_geografia")
    cursor.execute("DROP TABLE IF EXISTS dim_hogar")
    cursor.execute("DROP TABLE IF EXISTS dim_madre")
    cursor.execute("DROP TABLE IF EXISTS dim_nino")
    
    conn.commit()
    logger.log("  ‚úì Tablas anteriores eliminadas (si exist√≠an)")


def guardar_en_sqlite(fact, dims, db_path):
    """Guardar modelo estrella en SQLite"""
    logger.log(f"\nGuardando en SQLite: {db_path}")
    
    conn = sqlite3.connect(db_path)
    
    # Crear esquema
    crear_esquema_sqlite(conn)
    
    # Guardar dimensiones
    logger.log("  Guardando dimensiones...")
    dims['dim_tiempo'].to_sql('dim_tiempo', conn, if_exists='replace', index=False)
    dims['dim_geografia'].to_sql('dim_geografia', conn, if_exists='replace', index=False)
    dims['dim_hogar'].to_sql('dim_hogar', conn, if_exists='replace', index=False)
    dims['dim_madre'].to_sql('dim_madre', conn, if_exists='replace', index=False)
    dims['dim_nino'].to_sql('dim_nino', conn, if_exists='replace', index=False)
    
    # Guardar hechos (en chunks)
    logger.log("  Guardando tabla de hechos...")
    fact.to_sql('fact_anemia', conn, if_exists='replace', 
                index=False, chunksize=config.CHUNK_SIZE)
    
    # Crear √≠ndices
    logger.log("  Creando √≠ndices...")
    cursor = conn.cursor()
    
    indices = [
        "CREATE INDEX idx_fact_tiempo ON fact_anemia(id_tiempo)",
        "CREATE INDEX idx_fact_geografia ON fact_anemia(id_geografia)",
        "CREATE INDEX idx_fact_hogar ON fact_anemia(id_hogar)",
        "CREATE INDEX idx_fact_madre ON fact_anemia(id_madre)",
        "CREATE INDEX idx_fact_nino ON fact_anemia(id_nino)",
        "CREATE INDEX idx_fact_anemia ON fact_anemia(tiene_anemia)",
        "CREATE INDEX idx_dim_geo_dept ON dim_geografia(departamento)",
        "CREATE INDEX idx_dim_tiempo_anio ON dim_tiempo(anio)"
    ]
    
    for idx_sql in indices:
        try:
            cursor.execute(idx_sql)
        except Exception as e:
            logger.log(f"    ‚ö†Ô∏è  Error creando √≠ndice: {e}")
    
    conn.commit()
    conn.close()
    
    logger.log("  ‚úÖ Guardado en SQLite completado")


def guardar_backup_parquet(fact, dims):
    """Backup en Parquet"""
    if not config.BACKUP_PARQUET:
        return
    
    logger.log(f"\nCreando backup Parquet en {config.BACKUP_DIR}...")
    
    fact.to_parquet(config.BACKUP_DIR / "fact_anemia.parquet", index=False)
    dims['dim_tiempo'].to_parquet(config.BACKUP_DIR / "dim_tiempo.parquet", index=False)
    dims['dim_geografia'].to_parquet(config.BACKUP_DIR / "dim_geografia.parquet", index=False)
    dims['dim_hogar'].to_parquet(config.BACKUP_DIR / "dim_hogar.parquet", index=False)
    dims['dim_madre'].to_parquet(config.BACKUP_DIR / "dim_madre.parquet", index=False)
    dims['dim_nino'].to_parquet(config.BACKUP_DIR / "dim_nino.parquet", index=False)
    
    logger.log("  ‚úÖ Backup Parquet completado")


def generar_metadata(fact, dims):
    """Generar metadata del warehouse"""
    logger.log("\nGenerando metadata...")
    
    metadata = {
        'fecha_creacion': datetime.now().isoformat(),
        'registros': {
            'fact_anemia': len(fact),
            'dim_tiempo': len(dims['dim_tiempo']),
            'dim_geografia': len(dims['dim_geografia']),
            'dim_hogar': len(dims['dim_hogar']),
            'dim_madre': len(dims['dim_madre']),
            'dim_nino': len(dims['dim_nino'])
        },
        'columnas': {
            'fact_anemia': list(fact.columns),
            'dim_tiempo': list(dims['dim_tiempo'].columns),
            'dim_geografia': list(dims['dim_geografia'].columns),
            'dim_hogar': list(dims['dim_hogar'].columns),
            'dim_madre': list(dims['dim_madre'].columns),
            'dim_nino': list(dims['dim_nino'].columns)
        },
        'prevalencia_anemia': float(fact['tiene_anemia'].mean() * 100) if 'tiene_anemia' in fact.columns else None
    }
    
    # Guardar como JSON
    with open(config.METADATA_DIR / 'estadisticas.json', 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)
    
    logger.log("  ‚úì Metadata guardada")


# ============================================================
# PIPELINE PRINCIPAL
# ============================================================

def main():
    """Pipeline completo ETL"""
    
    logger.log("="*60)
    logger.log("INICIANDO GENERACI√ìN DE MODELO ESTRELLA")
    logger.log("="*60)
    
    try:
        # 1. EXTRACT
        logger.log(f"\n1. Cargando CSV: {config.CSV_INPUT}")
        df = pd.read_csv(config.CSV_INPUT, encoding='utf-8-sig')
        logger.log(f"   ‚úì {len(df):,} registros cargados")
        logger.log(f"   ‚úì {df.shape[1]} columnas")
        
        # 2. TRANSFORM - Crear dimensiones
        logger.log("\n2. Creando dimensiones...")
        dim_tiempo = crear_dim_tiempo(df)
        dim_geografia = crear_dim_geografia(df)
        dim_hogar = crear_dim_hogar(df)
        dim_madre = crear_dim_madre(df)
        dim_nino = crear_dim_nino(df)
        
        dims = {
            'dim_tiempo': dim_tiempo,
            'dim_geografia': dim_geografia,
            'dim_hogar': dim_hogar,
            'dim_madre': dim_madre,
            'dim_nino': dim_nino
        }
        
        # 3. TRANSFORM - Crear hechos
        logger.log("\n3. Creando tabla de hechos...")
        fact = crear_fact_anemia(df, dim_tiempo, dim_geografia, 
                                dim_hogar, dim_madre, dim_nino)
        
        # 4. VALIDACIONES
        logger.log("\n4. Validando modelo...")
        validar_integridad_referencial(fact, dims)
        validar_calidad_datos(fact, dims)
        
        # 5. LOAD
        logger.log("\n5. Cargando a Data Warehouse...")
        guardar_en_sqlite(fact, dims, config.DB_OUTPUT)
        guardar_backup_parquet(fact, dims)
        generar_metadata(fact, dims)
        
        # FIN
        logger.log("\n" + "="*60)
        logger.log("‚úÖ MODELO ESTRELLA GENERADO EXITOSAMENTE")
        logger.log("="*60)
        logger.log(f"\nüìÅ Archivos generados:")
        logger.log(f"   ‚Ä¢ SQLite:  {config.DB_OUTPUT}")
        logger.log(f"   ‚Ä¢ Metadata: {config.METADATA_DIR}")
        logger.log(f"   ‚Ä¢ Backup:   {config.BACKUP_DIR}")
        
        # Estad√≠sticas finales
        tamanio_db = Path(config.DB_OUTPUT).stat().st_size / (1024*1024)
        logger.log(f"\nüìä Estad√≠sticas:")
        logger.log(f"   ‚Ä¢ Tama√±o DB: {tamanio_db:.2f} MB")
        logger.log(f"   ‚Ä¢ Total registros FACT: {len(fact):,}")
        logger.log(f"   ‚Ä¢ Prevalencia anemia: {fact['tiene_anemia'].mean()*100:.2f}%")
        
    except Exception as e:
        logger.log(f"\n‚ùå ERROR: {str(e)}")
        import traceback
        logger.log(traceback.format_exc())
        raise
    
    finally:
        logger.finalizar()


if __name__ == "__main__":
    main()

[2025-11-04 02:07:44] INICIANDO GENERACI√ìN DE MODELO ESTRELLA
[2025-11-04 02:07:44] 
1. Cargando CSV: D:\Bases_train_test\endes_2015_2024_consolidado.csv
[2025-11-04 02:07:44]    ‚úì 131,470 registros cargados
[2025-11-04 02:07:44]    ‚úì 42 columnas
[2025-11-04 02:07:44] 
2. Creando dimensiones...
[2025-11-04 02:07:44] Creando DIM_TIEMPO (CORREGIDA - 1 registro por a√±o)...
[2025-11-04 02:07:44]   ‚úì 10 registros creados (1 por a√±o)
[2025-11-04 02:07:44] Creando DIM_GEOGRAFIA...
[2025-11-04 02:07:45]   ‚úì 8090 registros creados
[2025-11-04 02:07:45] Creando DIM_HOGAR...
[2025-11-04 02:07:45]   ‚úì 94921 registros creados
[2025-11-04 02:07:45] Creando DIM_MADRE...
[2025-11-04 02:07:45]   ‚úì 99928 registros creados
[2025-11-04 02:07:45] Creando DIM_NINO...
[2025-11-04 02:07:45]   ‚ö†Ô∏è  Encontrados 26552 duplicados en HHID+HC0
[2025-11-04 02:07:45]   ‚ö†Ô∏è  Total registros duplicados (incluyendo originales): 50674
[2025-11-04 02:07:45]   ‚ö†Ô∏è  1540 grupos con datos conflictivos

In [1]:
"""
GENERADOR DE MODELO ESTRELLA - ANEMIA INFANTIL ENDES (VERSI√ìN FINAL)
Convierte CSV consolidado ‚Üí SQLite con esquema estrella
CORRECCIONES:
- dim_tiempo: 1 registro por a√±o
- Manejo de duplicados en dim_nino y fact_anemia
- Funci√≥n crear_fact_anemia sin duplicaci√≥n
"""

import pandas as pd
import numpy as np
import sqlite3
from pathlib import Path
from datetime import datetime
import json
import warnings
warnings.filterwarnings('ignore')

# ============================================================
# CONFIGURACI√ìN
# ============================================================
class Config:
    CSV_INPUT = r"D:\Bases_train_test\endes_2015_2024_consolidado.csv"
    DB_OUTPUT = r"D:\Data_Warehouse\anemia_dwh.db"
    METADATA_DIR = Path(r"D:\Data_Warehouse\metadata")
    BACKUP_DIR = Path(r"D:\Data_Warehouse\backup")
    
    METADATA_DIR.mkdir(parents=True, exist_ok=True)
    BACKUP_DIR.mkdir(parents=True, exist_ok=True)
    
    CHUNK_SIZE = 5000
    BACKUP_PARQUET = True

config = Config()

# ============================================================
# LOGGER
# ============================================================
class Logger:
    def __init__(self, log_file):
        self.log_file = log_file
        self.start_time = datetime.now()
        
    def log(self, mensaje):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        msg = f"[{timestamp}] {mensaje}"
        print(msg)
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(msg + '\n')
    
    def finalizar(self):
        duracion = (datetime.now() - self.start_time).total_seconds()
        self.log(f"\n{'='*60}")
        self.log(f"Proceso completado en {duracion:.2f} segundos")
        self.log(f"{'='*60}")

log_file = config.METADATA_DIR / f"log_etl_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
logger = Logger(log_file)

# ============================================================
# DIMENSIONES
# ============================================================

def crear_dim_tiempo(df):
    """Dimensi√≥n TIEMPO - 1 registro por a√±o"""
    logger.log("Creando DIM_TIEMPO...")
    
    anios = sorted(df['ANIO'].dropna().unique())
    
    dim_tiempo = []
    for anio in anios:
        anio_int = int(anio)
        dim_tiempo.append({
            'id_tiempo': anio_int * 100 + 6,
            'anio': anio_int,
            'mes': 6,
            'trimestre': 2,
            'semestre': 1,
            'nombre_mes': 'Junio',
            'quinquenio': f"{(anio_int//5)*5}-{(anio_int//5)*5+4}",
            'periodo_covid': int(anio_int in [2020, 2021])
        })
    
    df_tiempo = pd.DataFrame(dim_tiempo)
    logger.log(f"  ‚úì {len(df_tiempo)} registros creados")
    return df_tiempo


def crear_dim_geografia(df):
    """Dimensi√≥n GEOGRAF√çA"""
    logger.log("Creando DIM_GEOGRAFIA...")
    
    geo_cols = ['HV024', 'HV025', 'HV040']
    df_geo = df[geo_cols].drop_duplicates().reset_index(drop=True)
    df_geo['id_geografia'] = range(1, len(df_geo) + 1)
    
    mapa_region = {
        'tumbes': 'Costa', 'piura': 'Costa', 'lambayeque': 'Costa', 
        'la libertad': 'Costa', 'ancash': 'Costa', 'lima': 'Costa',
        'ica': 'Costa', 'arequipa': 'Costa', 'moquegua': 'Costa', 
        'tacna': 'Costa', 'callao': 'Costa',
        'cajamarca': 'Sierra', 'huanuco': 'Sierra', 'pasco': 'Sierra',
        'junin': 'Sierra', 'huancavelica': 'Sierra', 'ayacucho': 'Sierra',
        'apurimac': 'Sierra', 'cusco': 'Sierra', 'puno': 'Sierra',
        'loreto': 'Selva', 'amazonas': 'Selva', 'san martin': 'Selva',
        'ucayali': 'Selva', 'madre de dios': 'Selva'
    }
    
    df_geo['region_natural'] = df_geo['HV024'].str.lower().str.strip().map(mapa_region).fillna('Otro')
    
    df_geo['rango_altitud'] = pd.cut(
        df_geo['HV040'],
        bins=[-1, 500, 1500, 2500, 5000],
        labels=['<500m', '500-1500m', '1500-2500m', '>2500m']
    ).astype(str)
    
    df_geo.rename(columns={
        'HV024': 'departamento',
        'HV025': 'area_residencia',
        'HV040': 'altitud_msnm'
    }, inplace=True)
    
    df_geo = df_geo[['id_geografia', 'departamento', 'region_natural', 
                     'area_residencia', 'altitud_msnm', 'rango_altitud']]
    
    logger.log(f"  ‚úì {len(df_geo)} registros creados")
    return df_geo


def crear_dim_hogar(df):
    """Dimensi√≥n HOGAR"""
    logger.log("Creando DIM_HOGAR...")
    
    hogar_cols = ['HHID', 'HV009', 'HV271', 'V190', 'HV206', 'HV201', 'HV205', 'HV237']
    hogar_cols_disp = [col for col in hogar_cols if col in df.columns]
    
    df_hogar = df[hogar_cols_disp].drop_duplicates(subset='HHID').reset_index(drop=True)
    df_hogar['id_hogar'] = range(1, len(df_hogar) + 1)
    
    if 'HV009' in df_hogar.columns:
        df_hogar['categoria_tamano'] = pd.cut(
            df_hogar['HV009'],
            bins=[0, 3, 5, 100],
            labels=['Peque√±o', 'Mediano', 'Grande']
        ).astype(str)
    
    rename_map = {
        'HV009': 'num_miembros', 'HV271': 'indice_riqueza_num',
        'V190': 'quintil_riqueza', 'HV206': 'tiene_electricidad',
        'HV201': 'fuente_agua', 'HV205': 'tipo_saneamiento', 'HV237': 'trata_agua'
    }
    df_hogar.rename(columns={k: v for k, v in rename_map.items() if k in df_hogar.columns}, inplace=True)
    
    cols = ['id_hogar', 'HHID'] + [c for c in df_hogar.columns if c not in ['id_hogar', 'HHID']]
    df_hogar = df_hogar[cols]
    
    logger.log(f"  ‚úì {len(df_hogar)} registros creados")
    return df_hogar


def crear_dim_madre(df):
    """Dimensi√≥n MADRE"""
    logger.log("Creando DIM_MADRE...")
    
    madre_cols = ['CASEID', 'V012', 'V106', 'V133', 'V025']
    madre_cols_disp = [col for col in madre_cols if col in df.columns]
    
    df_madre = df[madre_cols_disp].drop_duplicates(subset='CASEID').reset_index(drop=True)
    df_madre['id_madre'] = range(1, len(df_madre) + 1)
    
    if 'V012' in df_madre.columns:
        df_madre['rango_edad'] = pd.cut(df_madre['V012'], bins=[0, 20, 35, 100], 
                                         labels=['<20', '20-34', '35+']).astype(str)
        df_madre['es_madre_adolescente'] = (df_madre['V012'] < 20).astype(int)
    
    if 'V133' in df_madre.columns:
        df_madre['categoria_educacion'] = pd.cut(df_madre['V133'], bins=[-1, 6, 12, 100],
                                                  labels=['Baja', 'Media', 'Alta']).astype(str)
    
    rename_map = {'V012': 'edad_actual', 'V106': 'nivel_educativo', 
                  'V133': 'anios_educacion', 'V025': 'area_residencia_madre'}
    df_madre.rename(columns={k: v for k, v in rename_map.items() if k in df_madre.columns}, inplace=True)
    
    cols = ['id_madre', 'CASEID'] + [c for c in df_madre.columns if c not in ['id_madre', 'CASEID']]
    df_madre = df_madre[cols]
    
    logger.log(f"  ‚úì {len(df_madre)} registros creados")
    return df_madre


def crear_dim_nino(df):
    """Dimensi√≥n NI√ëO - Elimina duplicados"""
    logger.log("Creando DIM_NINO...")
    
    nino_cols = ['HHID', 'HC0', 'HC1', 'HC27', 'BORD', 'HC70', 'HW71']
    nino_cols_disp = [col for col in nino_cols if col in df.columns]
    
    # Eliminar duplicados ANTES de crear dimensi√≥n
    duplicados = df.duplicated(subset=['HHID', 'HC0']).sum()
    if duplicados > 0:
        logger.log(f"  ‚ö†Ô∏è  {duplicados} duplicados HHID+HC0 encontrados, eliminando...")
    
    df_nino = df[nino_cols_disp].drop_duplicates(subset=['HHID', 'HC0'], keep='first').copy()
    df_nino['id_nino'] = range(1, len(df_nino) + 1)
    
    if 'HC1' in df_nino.columns:
        df_nino['rango_edad'] = pd.cut(df_nino['HC1'], bins=[5, 12, 18, 24, 36],
                                        labels=['6-11m', '12-17m', '18-23m', '24-35m'],
                                        right=False).astype(str)
    
    if 'HC70' in df_nino.columns:
        df_nino['tiene_desnutricion_cronica'] = (df_nino['HC70'] < -2).astype(int)
    
    if 'HW71' in df_nino.columns:
        df_nino['tiene_bajo_peso'] = (df_nino['HW71'] < -2).astype(int)
    
    if 'BORD' in df_nino.columns:
        df_nino['categoria_orden'] = pd.cut(df_nino['BORD'], bins=[0, 1, 3, 100],
                                            labels=['Primog√©nito', '2-3', '4+']).astype(str)
    
    rename_map = {'HC1': 'edad_meses', 'HC27': 'sexo', 'BORD': 'orden_nacimiento'}
    df_nino.rename(columns={k: v for k, v in rename_map.items() if k in df_nino.columns}, inplace=True)
    
    cols = ['id_nino', 'HHID', 'HC0'] + [c for c in df_nino.columns if c not in ['id_nino', 'HHID', 'HC0']]
    df_nino = df_nino[cols]
    
    logger.log(f"  ‚úì {len(df_nino)} registros creados")
    return df_nino


def crear_fact_anemia(df, dim_tiempo, dim_geo, dim_hogar, dim_madre, dim_nino):
    """Tabla de HECHOS - Sin duplicados"""
    logger.log("Creando FACT_ANEMIA...")
    
    fact = df.copy()
    
    # 1. ELIMINAR DUPLICADOS PRIMERO
    registros_antes = len(fact)
    fact = fact.drop_duplicates(subset=['HHID', 'HC0', 'ANIO'], keep='first')
    registros_despues = len(fact)
    
    if registros_antes != registros_despues:
        logger.log(f"  ‚ö†Ô∏è  {registros_antes - registros_despues} duplicados eliminados")
    
    # 2. Mapear FK
    fact['id_tiempo'] = fact['ANIO'].astype(int) * 100 + 6
    
    # 3. Geograf√≠a
    fact = fact.merge(
        dim_geo[['id_geografia', 'departamento', 'area_residencia', 'altitud_msnm']],
        left_on=['HV024', 'HV025', 'HV040'],
        right_on=['departamento', 'area_residencia', 'altitud_msnm'],
        how='left'
    ).drop(columns=['departamento', 'area_residencia', 'altitud_msnm'])
    
    # 4. Hogar
    fact = fact.merge(dim_hogar[['id_hogar', 'HHID']], on='HHID', how='left')
    
    # 5. Madre
    fact = fact.merge(dim_madre[['id_madre', 'CASEID']], on='CASEID', how='left')
    
    # 6. Ni√±o
    fact = fact.merge(dim_nino[['id_nino', 'HHID', 'HC0']], on=['HHID', 'HC0'], how='left')
    
    # 7. Verificar duplicados finales
    dup_final = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
    if dup_final > 0:
        logger.log(f"  ‚ö†Ô∏è  {dup_final} duplicados finales, eliminando...")
        fact = fact.drop_duplicates(subset=['id_nino', 'id_tiempo'], keep='first')
    
    # 8. Seleccionar columnas
    fact_cols = [
        'id_nino', 'id_tiempo', 'id_geografia', 'id_hogar', 'id_madre',
        'HHID', 'CASEID', 'HC0',
        'HW2', 'HW3', 'HC70', 'HW70', 'HW71', 'HW72', 'HW73',
        'ANEMIA', 'HC57', 'PESO',
        'HC55', 'HV015', 'HV103'
    ]
    
    fact_cols_disp = [col for col in fact_cols if col in fact.columns]
    fact = fact[fact_cols_disp]
    
    # 9. Renombrar
    rename_map = {
        'HW2': 'peso_kg', 'HW3': 'talla_cm', 'HC70': 'z_talla_edad',
        'HW70': 'z_talla_edad_alt', 'HW71': 'z_peso_edad', 'HW72': 'z_peso_talla',
        'HW73': 'z_imc', 'ANEMIA': 'tiene_anemia', 'HC57': 'nivel_anemia',
        'PESO': 'peso_muestral', 'HC55': 'medicion_valida',
        'HV015': 'cuestionario_ok', 'HV103': 'durmio_anoche'
    }
    fact.rename(columns={k: v for k, v in rename_map.items() if k in fact.columns}, inplace=True)
    
    logger.log(f"  ‚úì {len(fact)} registros creados")
    return fact


# ============================================================
# VALIDACIONES
# ============================================================

def validar_integridad_referencial(fact, dims):
    logger.log("\nValidando integridad referencial...")
    
    checks = [
        ('id_tiempo', dims['dim_tiempo'], 'id_tiempo'),
        ('id_geografia', dims['dim_geografia'], 'id_geografia'),
        ('id_hogar', dims['dim_hogar'], 'id_hogar'),
        ('id_madre', dims['dim_madre'], 'id_madre'),
        ('id_nino', dims['dim_nino'], 'id_nino')
    ]
    
    errores = []
    for fk_col, dim_df, dim_pk in checks:
        if fk_col in fact.columns:
            invalidos = ~fact[fk_col].isin(dim_df[dim_pk])
            n_invalidos = invalidos.sum()
            
            if n_invalidos > 0:
                errores.append(f"  ‚ùå {fk_col}: {n_invalidos} inv√°lidos")
            else:
                logger.log(f"  ‚úì {fk_col}: OK")
    
    if errores:
        logger.log("\n‚ö†Ô∏è ERRORES:")
        for e in errores:
            logger.log(e)
    else:
        logger.log("  ‚úÖ Integridad OK")


def validar_calidad_datos(fact, dims):
    logger.log("\nValidando calidad...")
    logger.log(f"  FACT: {len(fact):,} registros")
    
    dup = fact.duplicated(subset=['id_nino', 'id_tiempo']).sum()
    logger.log(f"  Duplicados: {dup}")
    
    logger.log(f"\nDimensiones:")
    logger.log(f"  DIM_TIEMPO:    {len(dims['dim_tiempo']):>6,}")
    logger.log(f"  DIM_GEOGRAFIA: {len(dims['dim_geografia']):>6,}")
    logger.log(f"  DIM_HOGAR:     {len(dims['dim_hogar']):>6,}")
    logger.log(f"  DIM_MADRE:     {len(dims['dim_madre']):>6,}")
    logger.log(f"  DIM_NINO:      {len(dims['dim_nino']):>6,}")


# ============================================================
# GUARDAR
# ============================================================

def guardar_en_sqlite(fact, dims, db_path):
    logger.log(f"\nGuardando en SQLite: {db_path}")
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Eliminar tablas anteriores
    for tabla in ['fact_anemia', 'dim_tiempo', 'dim_geografia', 'dim_hogar', 'dim_madre', 'dim_nino']:
        cursor.execute(f"DROP TABLE IF EXISTS {tabla}")
    conn.commit()
    
    # Guardar dimensiones
    logger.log("  Guardando dimensiones...")
    dims['dim_tiempo'].to_sql('dim_tiempo', conn, if_exists='replace', index=False)
    dims['dim_geografia'].to_sql('dim_geografia', conn, if_exists='replace', index=False)
    dims['dim_hogar'].to_sql('dim_hogar', conn, if_exists='replace', index=False)
    dims['dim_madre'].to_sql('dim_madre', conn, if_exists='replace', index=False)
    dims['dim_nino'].to_sql('dim_nino', conn, if_exists='replace', index=False)
    
    # Guardar hechos
    logger.log("  Guardando hechos...")
    fact.to_sql('fact_anemia', conn, if_exists='replace', index=False, chunksize=5000)
    
    # √çndices
    logger.log("  Creando √≠ndices...")
    indices = [
        "CREATE INDEX idx_fact_tiempo ON fact_anemia(id_tiempo)",
        "CREATE INDEX idx_fact_geografia ON fact_anemia(id_geografia)",
        "CREATE INDEX idx_fact_anemia ON fact_anemia(tiene_anemia)"
    ]
    
    for idx in indices:
        try:
            cursor.execute(idx)
        except Exception as e:
            logger.log(f"  ‚ö†Ô∏è Error √≠ndice: {e}")
    
    conn.commit()
    
    # Verificar
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tablas = cursor.fetchall()
    logger.log(f"\n  ‚úì Tablas creadas: {[t[0] for t in tablas]}")
    
    for tabla in ['fact_anemia', 'dim_tiempo', 'dim_geografia', 'dim_hogar', 'dim_madre', 'dim_nino']:
        cursor.execute(f"SELECT COUNT(*) FROM {tabla}")
        count = cursor.fetchone()[0]
        logger.log(f"  ‚úì {tabla}: {count:,} registros")
    
    conn.close()
    logger.log("  ‚úÖ Guardado completado")


# ============================================================
# MAIN
# ============================================================

def main():
    logger.log("="*60)
    logger.log("GENERACI√ìN DE MODELO ESTRELLA - VERSI√ìN FINAL")
    logger.log("="*60)
    
    try:
        # 1. Cargar
        logger.log(f"\n1. Cargando: {config.CSV_INPUT}")
        df = pd.read_csv(config.CSV_INPUT, encoding='utf-8-sig')
        logger.log(f"   ‚úì {len(df):,} registros, {df.shape[1]} columnas")
        
        # 2. Dimensiones
        logger.log("\n2. Creando dimensiones...")
        dim_tiempo = crear_dim_tiempo(df)
        dim_geografia = crear_dim_geografia(df)
        dim_hogar = crear_dim_hogar(df)
        dim_madre = crear_dim_madre(df)
        dim_nino = crear_dim_nino(df)
        
        dims = {
            'dim_tiempo': dim_tiempo,
            'dim_geografia': dim_geografia,
            'dim_hogar': dim_hogar,
            'dim_madre': dim_madre,
            'dim_nino': dim_nino
        }
        
        # 3. Hechos
        logger.log("\n3. Creando hechos...")
        fact = crear_fact_anemia(df, dim_tiempo, dim_geografia, dim_hogar, dim_madre, dim_nino)
        
        # 4. Validar
        logger.log("\n4. Validando...")
        validar_integridad_referencial(fact, dims)
        validar_calidad_datos(fact, dims)
        
        # 5. Guardar
        logger.log("\n5. Guardando...")
        guardar_en_sqlite(fact, dims, config.DB_OUTPUT)
        
        # Verificaci√≥n final
        logger.log("\n6. Verificaci√≥n final...")
        conn = sqlite3.connect(config.DB_OUTPUT)
        query = """
        SELECT t.anio, COUNT(*) as casos, SUM(f.tiene_anemia) as con_anemia,
               ROUND(AVG(f.tiene_anemia)*100, 2) as prevalencia
        FROM fact_anemia f
        JOIN dim_tiempo t ON f.id_tiempo = t.id_tiempo
        WHERE f.tiene_anemia IS NOT NULL
        GROUP BY t.anio
        ORDER BY t.anio
        """
        df_verif = pd.read_sql(query, conn)
        conn.close()
        
        logger.log("\nüìä Casos por a√±o:")
        logger.log(df_verif.to_string(index=False))
        
        # Stats finales
        tamanio = Path(config.DB_OUTPUT).stat().st_size / (1024*1024)
        logger.log(f"\n‚úÖ COMPLETADO")
        logger.log(f"   ‚Ä¢ Tama√±o DB: {tamanio:.2f} MB")
        logger.log(f"   ‚Ä¢ Total registros: {len(fact):,}")
        logger.log(f"   ‚Ä¢ Prevalencia: {fact['tiene_anemia'].mean()*100:.2f}%")
        
    except Exception as e:
        logger.log(f"\n‚ùå ERROR: {str(e)}")
        import traceback
        logger.log(traceback.format_exc())
        raise
    
    finally:
        logger.finalizar()


if __name__ == "__main__":
    main()

[2025-11-21 00:34:35] GENERACI√ìN DE MODELO ESTRELLA - VERSI√ìN FINAL
[2025-11-21 00:34:35] 
1. Cargando: D:\Bases_train_test\endes_2015_2024_consolidado.csv
[2025-11-21 00:34:35]    ‚úì 131,470 registros, 42 columnas
[2025-11-21 00:34:35] 
2. Creando dimensiones...
[2025-11-21 00:34:35] Creando DIM_TIEMPO...
[2025-11-21 00:34:35]   ‚úì 10 registros creados
[2025-11-21 00:34:35] Creando DIM_GEOGRAFIA...
[2025-11-21 00:34:35]   ‚úì 8373 registros creados
[2025-11-21 00:34:35] Creando DIM_HOGAR...
[2025-11-21 00:34:35]   ‚úì 94921 registros creados
[2025-11-21 00:34:35] Creando DIM_MADRE...
[2025-11-21 00:34:35]   ‚úì 99928 registros creados
[2025-11-21 00:34:35] Creando DIM_NINO...
[2025-11-21 00:34:35]   ‚ö†Ô∏è  26552 duplicados HHID+HC0 encontrados, eliminando...
[2025-11-21 00:34:35]   ‚úì 104918 registros creados
[2025-11-21 00:34:35] 
3. Creando hechos...
[2025-11-21 00:34:35] Creando FACT_ANEMIA...
[2025-11-21 00:34:35]   ‚ö†Ô∏è  24988 duplicados eliminados
[2025-11-21 00:34:36]  