# Generaci√≥n de Datos de Inversiones de Clientes - ETL

**Autor:** Diego Cuasapaz  
**Fecha:** 2026-02-21  
**Versi√≥n:** 1.0

## üìã Descripci√≥n General
Este notebook ejecuta el proceso completo de Extracci√≥n, Transformaci√≥n y Carga (ETL) de datos de inversiones de clientes bancarios. Genera 550 registros de inversiones iniciales con datos simulados, los transforma a trav√©s de tres zonas de datos (Raw, Curada y Productiva), y consolida la informaci√≥n con datos maestros de clientes desde PostgreSQL.

## üéØ Objetivos
1. Conectar a la base de datos PostgreSQL local con credenciales configuradas.
2. Cargar datos maestros de clientes desde la tabla `zp.td_datos_clientes`.
3. Generar 550 registros de inversiones con l√≥gica de negocio (pignoraciones, cancelaciones, renovaciones).
4. Crear listado diario particionado por per√≠odo de cada inversi√≥n vigente.
5. Realizar transformaciones y mapeos de columnas seg√∫n est√°ndares bancarios.
6. Cargar datos en tres zonas: Raw (ZR), Curada (ZC) y Productiva (ZP).
7. Consolidar inversiones con datos de clientes para an√°lisis integral.
8. Liberar recursos del sistema al finalizar.

## ‚öôÔ∏è Requisitos Previos
* **Librer√≠as:** `pandas`, `numpy`, `sqlalchemy`, `python-dotenv`.
* **Base de Datos:** PostgreSQL configurada con conexi√≥n local.
* **Variables de entorno:** `.env` con credenciales (DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME).
* **Esquemas en PostgreSQL:** `zr_pas`, `zc_pas`, `zp` (deben existir).
* **Archivos generados:** CSV en `data/raw/`, `data/curada/`, `data/productiva/`.

## üìä Flujo de Datos
```
Clientes (PostgreSQL) 
    ‚Üì
Generaci√≥n de Inversiones (550 registros iniciales)
    ‚Üì
[Raw Zone] ‚Üí CSV y tabla particionada zr_pas.zr_fake_pas_inversiones
    ‚Üì
[Curada Zone] ‚Üí Transformaci√≥n y normalizaci√≥n ‚Üí zc_pas.zc_pas_inversiones
    ‚Üì
[Productiva Zone] ‚Üí Consolidaci√≥n con datos maestros ‚Üí zp.tn_pas_inversiones
    ‚Üì
Exportaci√≥n a CSV en zona productiva
```

## üîë Campos Principales Generados
- `numeroInversion`: Identificador √∫nico (INV-XXXXX)
- `codigoIdentificacionCliente`: Vinculaci√≥n con cliente
- `producto`: DPF Fijo, DPF Flexible o Certificado
- `estadoInversion`: VIGENTE, PIGNORADO, RENOVADO, PRE-CANCELADO, VENCIDO
- `montoAperturaInversion` y `tasaAperturaInversion`: Datos iniciales
- `interesDiaInversion` e `interesAcumuladoMesInversion`: C√°lculos de rendimiento
- `fechaProceso`, `fechaAperturaInversion`, `fechaVencimientoInversion`: Timeline
- `codigoPeriodo`: Formato YYYYMM para particionamiento

## ‚ö†Ô∏è Notas Importantes
- Utiliza particionamiento por per√≠odo en PostgreSQL para optimizar consultas hist√≥ricas.
- Implementa truncado de particiones existentes (l√≥gica de OVERWRITE).
- Las fechas se convierten a formato DATE en PostgreSQL para consistencia.
- Los booleanos se convierten a INTEGER (0/1) para compatibilidad.


## Paso 1: Configuraci√≥n de Estilos Visuales

Aplica estilos CSS personalizados a los DataFrames para que se visualicen con colores bancarios. Esto mejora la legibilidad de los datos durante el an√°lisis exploratorio.

**Caracter√≠sticas:**
- Encabezados con fondo azul marino (#1a237e)
- Filas alternas con fondo gris para mejor contraste
- Efecto hover para interactividad
- Fuente Segoe UI con tama√±o optimizado

In [415]:
from IPython.display import HTML, display

def aplicar_estilo_bancario():
    style = """
    <style>
        .dataframe {
            border-collapse: collapse;
            border: 2px solid #1a237e; /* Azul Marino Bancario */
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            font-size: 12px;
        }
        .dataframe thead {
            background-color: #1a237e;
            color: white;
            text-align: center;
        }
        .dataframe th, .dataframe td {
            padding: 10px 15px;
            border: 1px solid #e0e0e0;
        }
        .dataframe tbody tr:nth-child(even) {
            background-color: #f5f5f5;
        }
        .dataframe tbody tr:hover {
            background-color: #e8eaf6; /* Resaltado suave al pasar el mouse */
            cursor: pointer;
        }
        .dataframe td {
            text-align: right;
        }
        /* Alinear a la izquierda columnas de texto */
        .dataframe td:nth-child(2), .dataframe td:nth-child(3) {
            text-align: left;
        }
    </style>
    """
    display(HTML(style))

aplicar_estilo_bancario()

## Paso 2: Importaci√≥n de Librer√≠as Requeridas

Carga todas las dependencias necesarias para el procesamiento ETL:

| Librer√≠a | Prop√≥sito |
|----------|----------|
| `pandas` | Manipulaci√≥n y an√°lisis de datos (DataFrames) |
| `numpy` | Operaciones num√©ricas y generaci√≥n de datos aleatorios |
| `sqlalchemy` | ORM y conexi√≥n con PostgreSQL |
| `dotenv` | Carga segura de credenciales desde archivo `.env` |
| `urllib.parse` | Codificaci√≥n de contrase√±as con caracteres especiales |
| `gc` | Garbage collection para liberar memoria |

In [416]:
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text, MetaData, Table, select, and_
from dotenv import load_dotenv
import os
import urllib.parse  # <--- Importante para caracteres especiales
import gc


## Paso 3: Configuraci√≥n de Par√°metros Globales

Define las variables que controlan el comportamiento del notebook:

**Par√°metros de Datos:**
- `N_INVERSIONES_INICIALES = 550`: Cantidad de inversiones a generar
- `FECHA_INICIO_MES = 2025-11-01`: Primer d√≠a del per√≠odo de an√°lisis
- `FECHA_FIN_MES = 2025-11-30`: √öltimo d√≠a del per√≠odo de an√°lisis

**Esquemas y Tablas en PostgreSQL:**
- **Zona Raw (ZR):** `zr_pas.zr_fake_pas_inversiones` (datos sin transformar)
- **Zona Curada (ZC):** `zc_pas.zc_pas_inversiones` (datos normalizados)
- **Zona Productiva (ZP):** `zp.tn_pas_inversiones` (datos finales consolidados)

In [417]:

N_INVERSIONES_INICIALES = 900
FECHA_INICIO_MES = datetime(2025, 12, 1)
FECHA_FIN_MES = datetime(2025, 12, 31)

P_FECHA_INCIO = FECHA_INICIO_MES.strftime('%Y%m%d')
P_FECHA_FIN = FECHA_FIN_MES.strftime('%Y%m%d')

# Nombres para objetos 
#################################
## ZONA RAW
#################################

nombre_esquema_zr_pas = 'zr_pas'
nombre_tabla_zr_pas_inversiones = 'zr_fake_pas_inversiones'

#################################
## ZONA CURADA
#################################

nombre_esquema_zc_pas = 'zc_pas'
nombre_tabla_zc_pas_inversiones = 'zc_pas_inversiones'

#################################
## ZONA PRODUCTIVA
#################################

nombre_esquema_zp = 'zp'
nombre_tabla_zp_cli = 'td_datos_clientes'

nombre_tabla_zp_pas_inversiones = 'tn_pas_inversiones'

## Paso 4: Conexi√≥n a PostgreSQL

Establece la conexi√≥n segura con la base de datos PostgreSQL usando credenciales del archivo `.env`.

**Proceso:**
1. Carga variables de entorno (DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME)
2. Codifica la contrase√±a con `urllib.parse.quote_plus()` para manejar caracteres especiales
3. Crea engine de SQLAlchemy con la URL codificada
4. Valida la conexi√≥n con un intento de conexi√≥n

**Variables requeridas en `.env`:**
```
DB_USER=usuario
DB_PASSWORD=contrase√±a_con_caracteres_especiales
DB_HOST=localhost
DB_PORT=5432
DB_NAME=nombre_base_datos
```

In [418]:
# Forzar recarga del .env
load_dotenv(override=True)

user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
db = os.getenv("DB_NAME")

try:
    engine = secure_create_engine(user, password, host, port, db)
    with engine.connect() as conn:
        logger.info("‚úÖ Conexi√≥n exitosa con el engine seguro.")
except Exception as e:
    logger.exception("‚ùå Error al crear o conectar el engine: %s", e)
    raise


2026-02-21 19:59:51,999 INFO etl_inversiones Secure engine created for localhost:5432/db_poc_bancs
2026-02-21 19:59:52,034 INFO etl_inversiones ‚úÖ Conexi√≥n exitosa con el engine seguro.


## Paso 5: Carga de Datos Maestros de Clientes

Obtiene la tabla de clientes desde PostgreSQL para usarla como referencia en la generaci√≥n de inversiones.

**Proceso:**
1. Crea un objeto `MetaData` para acceder a la estructura de la tabla
2. Usa `autoload_with` para reflejar la tabla `zp.td_datos_clientes` desde PostgreSQL
3. Construye una consulta SELECT din√°mica con SQLAlchemy ORM
4. Lee la tabla completa en un DataFrame de Pandas

**Resultado:** `df_clientes` contiene todos los clientes disponibles para vincular con las inversiones.

In [419]:
# 1. Crear un objeto MetaData (contenedor de la estructura)
metadata = MetaData()

# 2. Reflejar la tabla desde el esquema espec√≠fico
# Esto lee la estructura directamente de Postgres
tabla_clientes = Table(
    nombre_tabla_zp_cli, 
    metadata, 
    autoload_with=engine, 
    schema=nombre_esquema_zp
)

# 3. Construir la consulta de forma program√°tica (Equivalente a SELECT *)
stmt = select(tabla_clientes)

# 4. Cargar en Pandas
df_clientes = pd.read_sql(stmt, engine)

df_clientes.head()

Unnamed: 0,codigoSecuencialCliente,codigoPeriodo,codigoIdentificacionCliente,tipoIdentificacionCliente,numeroIdentificacionCliente,nombreCompletoCliente,segmentoCliente,scoreCrediticioCliente,provinciaCliente,ciudadCliente,fechaRegistroCliente,periodo
0,1000,202602,CUS-00001,C√âDULA,5080432512,ALBERTO LAUREANO GUEVARA,WEALTH,584,GUAYAS,GUAYAQUIL,2024-12-26,2026-02-21
1,1001,202602,CUS-00002,PASAPORTE,P48197146,CLARA BENJAM√çN TREJO,CORPORATIVO,495,GUAYAS,GUAYAQUIL,2022-05-22,2026-02-21
2,1002,202602,CUS-00003,C√âDULA,7050104985,ROSA PICHARDO APARICIO,CORPORATIVO,606,MANAB√ç,MANTA,2023-06-29,2026-02-21
3,1003,202602,CUS-00004,C√âDULA,0018204650,AURELIO GAMEZ GONZALES,WEALTH,963,LOJA,QUITO,2023-01-04,2026-02-21
4,1004,202602,CUS-00005,C√âDULA,4452464449,ING. ALDONZA VALENT√çN,WEALTH,313,GUAYAS,CUENCA,2022-12-17,2026-02-21


## Paso 6: Generaci√≥n de Dataset de Inversiones Sint√©tico

Crea 550 registros de inversiones con l√≥gica de negocio realista. Esta es la parte central del notebook.

**Parte 1 - Maestro de Operaciones:**
Genera registro √∫nico por inversi√≥n con:
- ID √∫nico (INV-00001...INV-00550)
- Cliente asignado aleatoriamente
- Producto (DPF Fijo/Flexible o Certificado)
- Monto inicial entre $5,000 y $150,000
- Tasa entre 6% y 9% anual
- Plazo: 90, 180 o 360 d√≠as
- Fecha de apertura (√∫ltimos 6 meses)
- L√≥gica de pignoraciones (5% de casos)
- Cancelaciones anticipadas (3% de casos)
- Tipo de renovaci√≥n (30% autom√°tica, 20% ventanilla, 50% sin renovaci√≥n)

**Parte 2 - Listado Diario:**
Crea un registro POR D√çA POR INVERSI√ìN (periodo completo), determinando:
- Estado: VIGENTE, PIGNORADO, RENOVADO, PRE-CANCELADO, VENCIDO
- Monto actual (0 si est√° cancelada)
- Inter√©s diario calculado
- Inter√©s acumulado mensual

**Parte 3 - Campos de Auditor√≠a:**
Agrega:
- `codigoPeriodo`: Formato YYYYMM para particionamiento
- `fecha_ingesta`: Timestamp actual con precisi√≥n de microsegundos
- `periodo`: Formato YYYYMMDD para filtros

In [420]:
def generar_dataset_inversiones_final(df_clientes):
    np.random.seed(42)
    
    # 1. MAESTRO DE OPERACIONES (Snapshot de Apertura)
    inv_maestro = pd.DataFrame({
        'operacion_id': [f"INV-{i:05d}" for i in range(N_INVERSIONES_INICIALES)],
        # Vinculamos con el c√≥digo secuencial (PK de tu tabla en Postgres)
        'codigoIdentificacionCliente': np.random.choice(df_clientes['codigoIdentificacionCliente'], N_INVERSIONES_INICIALES),
        'producto': np.random.choice(['DPF Fijo', 'DPF Flexible', 'Certificado'], N_INVERSIONES_INICIALES),
        'monto_apertura': np.random.uniform(5000, 150000, N_INVERSIONES_INICIALES).round(2),
        'tasa_apertura': np.random.uniform(0.06, 0.09, N_INVERSIONES_INICIALES).round(4),
        'fecha_apertura': [FECHA_INICIO_MES - timedelta(days=np.random.randint(0, 180)) for _ in range(N_INVERSIONES_INICIALES)],
        'plazo_dias': np.random.choice([90, 180, 360], N_INVERSIONES_INICIALES)
    })
    
    # C√°lculos base de fechas
    inv_maestro['fecha_vencimiento'] = inv_maestro.apply(
        lambda x: x['fecha_apertura'] + timedelta(days=int(x['plazo_dias'])), axis=1
    )

    # --- L√≥gica de Negocio: Pignoraciones y Cancelaciones ---
    inv_maestro['es_pignorado'] = np.random.rand(len(inv_maestro)) < 0.05
    inv_maestro['fecha_cancelacion_real'] = pd.NaT
    
    mask_precancelado = np.random.rand(len(inv_maestro)) < 0.03
    for idx in inv_maestro[mask_precancelado].index:
        dias_antes = np.random.randint(1, 15)
        inv_maestro.at[idx, 'fecha_cancelacion_real'] = inv_maestro.at[idx, 'fecha_vencimiento'] - timedelta(days=dias_antes)

    # --- L√≥gica de Negocio: Renovaciones ---
    inv_maestro['tipo_renovacion'] = np.random.choice(
        ['AUTOMATICA', 'POR VENTANILLA', 'SIN RENOVACION'], 
        len(inv_maestro), p=[0.30, 0.20, 0.50]
    )
    inv_maestro['instruccion_renovacion'] = np.random.choice(
        ['SOLO CAPITAL', 'CAPITAL + INTERES', 'NA'], 
        len(inv_maestro), p=[0.25, 0.25, 0.50]
    )
    inv_maestro.loc[inv_maestro['tipo_renovacion'] == 'SIN RENOVACION', 'instruccion_renovacion'] = 'NA'

    # 2. GENERACI√ìN DE HECHOS DIARIOS
    rango_fechas = pd.date_range(start=FECHA_INICIO_MES, end=FECHA_FIN_MES)
    listado_diario = []

    for fecha in rango_fechas:
        for _, row in inv_maestro.iterrows():
            if row['fecha_apertura'] > fecha: continue
            
            # DETERMINACI√ìN DEL ESTADO
            if not pd.isnull(row['fecha_cancelacion_real']) and fecha >= row['fecha_cancelacion_real']:
                estado = 'PRE-CANCELADO'
            elif row['fecha_vencimiento'] < fecha:
                estado = 'RENOVADO' if row['tipo_renovacion'] != 'SIN RENOVACION' else 'VENCIDO'
            elif row['es_pignorado']:
                estado = 'PIGNORADO'
            else:
                estado = 'VIGENTE'
            
            monto_actual = row['monto_apertura'] if estado not in ['CANCELADO', 'PRE-CANCELADO'] else 0
            interes_diario = (monto_actual * row['tasa_apertura']) / 360 if monto_actual > 0 else 0
            fecha_renovacion = row['fecha_vencimiento'] if row['tipo_renovacion'] != 'SIN RENOVACION' else pd.NaT
            
            listado_diario.append({
                'fecha_proceso': fecha,
                'operacion_id': row['operacion_id'],
                'codigoIdentificacionCliente': row['codigoIdentificacionCliente'], # Agregado aqu√≠
                'monto_apertura': row['monto_apertura'],
                'tasa_apertura': row['tasa_apertura'],
                'monto_actual': monto_actual,
                'interes_dia': round(interes_diario, 4),
                'estado': estado,
                'es_pignorado': row['es_pignorado'],
                'tipo_renovacion': row['tipo_renovacion'],
                'instruccion_renovacion': row['instruccion_renovacion'],
                'fecha_renovacion': fecha_renovacion,
                'fecha_apertura': row['fecha_apertura'],
                'fecha_vencimiento': row['fecha_vencimiento'],
                'fecha_cancelacion_real': row['fecha_cancelacion_real']
            })

    df_diario = pd.DataFrame(listado_diario)
    
    # 3. M√©trica Agregada: Inter√©s acumulado
    df_diario['interes_acumulado_mes'] = df_diario.groupby(['operacion_id', df_diario['fecha_proceso'].dt.month])['interes_dia'].cumsum()

    # ------------------ Data Quality Assertions ------------------
    try:
        assert inv_maestro['operacion_id'].is_unique, "operacion_id duplicados en maestro"
        assert inv_maestro['monto_apertura'].between(5000, 150000).all(), "Monto fuera de rango"
        assert inv_maestro['tasa_apertura'].between(0.06, 0.09).all(), "Tasa fuera de rango"
        assert inv_maestro['fecha_apertura'].notnull().all(), "Fecha apertura null"
        assert df_diario['monto_actual'].ge(0).all(), "Monto actual negativo"
        assert df_diario['interes_dia'].ge(0).all(), "Inter√©s diario negativo"
        assert df_diario['fecha_proceso'].min() >= FECHA_INICIO_MES, "Fecha_proceso menor a FECHA_INICIO_MES"
        assert df_diario['fecha_proceso'].max() <= FECHA_FIN_MES, "Fecha_proceso mayor a FECHA_FIN_MES"
    except AssertionError as ae:
        logger.error("Data quality assertion failed: %s", ae)
        raise
    
    df_diario['periodo'] = df_diario['fecha_proceso'].dt.strftime('%Y%m%d')

    
    return df_diario

# Ejecuci√≥n
df_fact_inversiones = generar_dataset_inversiones_final(df_clientes)
df_fact_inversiones.head()


Unnamed: 0,fecha_proceso,operacion_id,codigoIdentificacionCliente,monto_apertura,tasa_apertura,monto_actual,interes_dia,estado,es_pignorado,tipo_renovacion,instruccion_renovacion,fecha_renovacion,fecha_apertura,fecha_vencimiento,fecha_cancelacion_real,interes_acumulado_mes,periodo
0,2025-12-01,INV-00000,CUS-00103,133466.77,0.0837,133466.77,31.031,PIGNORADO,True,SIN RENOVACION,,NaT,2025-09-02,2026-08-28,NaT,31.031,20251201
1,2025-12-01,INV-00001,CUS-00436,94300.16,0.0755,94300.16,19.7768,VIGENTE,False,SIN RENOVACION,,NaT,2025-09-03,2026-08-29,NaT,19.7768,20251201
2,2025-12-01,INV-00002,CUS-00349,38779.12,0.0732,38779.12,7.8851,VIGENTE,False,SIN RENOVACION,,NaT,2025-11-15,2026-05-14,NaT,7.8851,20251201
3,2025-12-01,INV-00003,CUS-00271,8538.11,0.0644,8538.11,1.5274,VIGENTE,False,AUTOMATICA,SOLO CAPITAL,2026-05-13,2025-11-14,2026-05-13,NaT,1.5274,20251201
4,2025-12-01,INV-00004,CUS-00107,131164.34,0.0698,131164.34,25.4313,VIGENTE,False,POR VENTANILLA,CAPITAL + INTERES,2025-12-09,2025-06-12,2025-12-09,NaT,25.4313,20251201


## Paso 7: Exportaci√≥n a CSV - Zona Raw

Guarda el dataset sin transformar en archivo CSV para auditor√≠a y respaldo.

**Destino:** `data/raw/zr_fake_pas_inversiones.csv`

**Prop√≥sito de la Zona Raw:**
- Mantener copia exacta de datos originales
- Facilitar auditor√≠a de cambios posteriores
- Permitir reintentos de procesamiento si falla la transformaci√≥n

In [421]:
from pathlib import Path

raw_dir = Path('../data/raw')
raw_dir.mkdir(parents=True, exist_ok=True)
csv_path = raw_dir / f"{nombre_tabla_zr_pas_inversiones}.csv"

df_fact_inversiones.to_csv(csv_path, index=False)
logger.info("Archivo generado exitosamente en Zona Raw: %s", csv_path)


2026-02-21 19:59:54,841 INFO etl_inversiones Archivo generado exitosamente en Zona Raw: ../data/raw/zr_fake_pas_inversiones.csv


## Paso 8: Funciones Auxiliares para Carga en PostgreSQL

Define dos funciones reutilizables para manejar la persistencia de datos en diferentes esquemas.

In [422]:
def get_sql_type(series):
    """
    Mapea tipos de datos de Pandas a tipos nativos de PostgreSQL.
    Dise√±ada para asegurar consistencia en la arquitectura de datos.
    """
    # 1. Manejo de Fechas (Prioridad por tu requerimiento de DATE vs TIMESTAMP)
    if "fecha" in series.name.lower() or pd.api.types.is_datetime64_any_dtype(series):
        return "DATE"
    
    # 2. Manejo de Booleanos (Convertidos a INT o BOOLEAN)
    if pd.api.types.is_bool_dtype(series):
        return "BOOLEAN"
    
    # 3. Manejo de Num√©ricos
    if pd.api.types.is_integer_dtype(series):
        return "INT"
    if pd.api.types.is_float_dtype(series):
        return "NUMERIC"
    
    # 4. Default para texto y otros
    return "VARCHAR"

### Funci√≥n 1: `get_sql_type(series)` - Mapeo de Tipos Pandas ‚Üí PostgreSQL

Convierte tipos de datos de Pandas a tipos nativos de PostgreSQL para garantizar compatibilidad.

**Mapeos:**
- Campos con "fecha" ‚Üí `DATE`
- Booleanos ‚Üí `BOOLEAN`
- Enteros ‚Üí `INT`
- Decimales ‚Üí `NUMERIC`
- Resto ‚Üí `VARCHAR`

---

### Funci√≥n 2: `cargar_datos_historicos_postgresql()`

Carga datos en PostgreSQL con soporte para particionamiento y reintentos autom√°ticos.

**Par√°metros:**
- `df`: DataFrame a cargar
- `nombre_esquema`: Schema destino (zr_pas, zc_pas, zp)
- `nombre_tabla`: Tabla destino
- `campo_particion`: Campo para particionar (per√≠odo)
- `primary_key_column`: Columna clave primaria

**L√≥gica:**
1. Verifica si la tabla padre existe en PostgreSQL
2. Si no existe: crea tabla con particionamiento por LIST
3. Para cada per√≠odo √∫nico en los datos:
   - Verifica si la partici√≥n existe
   - Si no existe: crea la partici√≥n
   - Si existe: **trunca para limpiar datos antiguos** (l√≥gica OVERWRITE)
4. Carga datos con `to_sql()` en modo append
5. Intenta definir clave primaria compuesta (PK + per√≠odo)

**Ventajas:**
- Idempotente: puede ejecutarse m√∫ltiples veces sin duplicar datos
- Optimizado: particionamiento mejora performance
- Robusto: manejo de errores con rollback

In [423]:
def cargar_datos_historicos_postgresql(df, nombre_esquema, nombre_tabla, campo_particion, primary_key_column):
    try:
        # VALIDACI√ìN PREVIA: ¬øExiste la columna en el DF?
        if campo_particion not in df.columns:
            raise KeyError(f"La columna de partici√≥n '{campo_particion}' no existe en el DataFrame. Columnas disponibles: {list(df.columns)}")

        with engine.connect() as con:
            # 1. VALIDAR SI LA TABLA PADRE EXISTE
            check_table = text(f"""
                SELECT EXISTS (
                    SELECT FROM information_schema.tables 
                    WHERE table_schema = '{nombre_esquema}' 
                    AND table_name = '{nombre_tabla}'
                );
            """)
            existe_tabla = con.execute(check_table).scalar()

            # 2. SI NO EXISTE, CREAR TABLA PADRE PARTICIONADA
            if not existe_tabla:
                print(f"üèóÔ∏è Creando tabla padre particionada: {nombre_esquema}.{nombre_tabla}")
                columnas_sql = ", ".join([f'"{col}" {get_sql_type(df[col])}' for col in df.columns])
                
                create_query = text(f"""
                    CREATE TABLE {nombre_esquema}.{nombre_tabla} (
                        {columnas_sql}
                    ) PARTITION BY LIST ("{campo_particion}");
                """)
                con.execute(create_query)
                con.commit()

            # 3. GESTI√ìN DE PARTICIONES (CON TRUNCATE PARA OVERWRITE)
            periodos_en_datos = df[campo_particion].unique()
            
            for periodo in periodos_en_datos:
                nombre_particion = f"{nombre_tabla}_p{periodo}"
                
                check_part = text(f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{nombre_esquema}' AND tablename = '{nombre_particion}');")
                existe_particion = con.execute(check_part).scalar()
                
                if not existe_particion:
                    print(f"üì¶ Creando nueva partici√≥n: {nombre_particion}")
                    con.execute(text(f'CREATE TABLE {nombre_esquema}.{nombre_particion} PARTITION OF {nombre_esquema}.{nombre_tabla} FOR VALUES IN (\'{periodo}\');'))
                    con.commit()
                else:
                    # L√≥gica de Overwrite: Limpiamos antes de cargar
                    print(f"üßπ Truncando partici√≥n existente: {nombre_particion}")
                    con.execute(text(f'TRUNCATE TABLE {nombre_esquema}.{nombre_particion} RESTART IDENTITY;'))
                    con.commit()

            # 4. CARGA DE DATOS
            df.to_sql(nombre_tabla, engine, schema=nombre_esquema, if_exists='append', index=False)
            
            # 5. PRIMARY KEY (Solo si no existe)
            try:
                pk_query = text(f'ALTER TABLE {nombre_esquema}.{nombre_tabla} ADD PRIMARY KEY ("{primary_key_column}", "{campo_particion}");')
                con.execute(pk_query)
                con.commit()
            except:
                con.rollback() 
                
        print(f"‚úÖ Carga exitosa en {nombre_esquema}.{nombre_tabla}")

    except Exception as e:
        print(f"‚ùå Error cr√≠tico en la carga hist√≥rica: {e}")

## Paso 9: Carga a PostgreSQL - Zona Raw

Inserta los datos sin transformar en la tabla particionada `zr_pas.zr_fake_pas_inversiones`.

**Resultado:**
- Tabla con 16,500 registros (550 inversiones √ó 30 d√≠as)
- Particionada por per√≠odo (YYYYMMDD)
- Clave primaria: (operacion_id, periodo)

In [424]:
# Ejecutar carga a Zona Raw
cargar_datos_historicos_postgresql(df_fact_inversiones,nombre_esquema_zr_pas,nombre_tabla_zr_pas_inversiones,'periodo','operacion_id')
print("‚úÖ Inversiones cargada exitosamente en PostgreSQL.")

üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251201
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251202
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251203
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251204
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251205
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251206
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251207
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251208
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251209
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251210
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251211
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251212
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251213
üßπ Truncando partici√≥n existente: zr_fake_pas_inversiones_p20251214
üßπ T

## Paso 10: Funci√≥n de Lectura de Datos Particionados

Define `leer_rango_particionado()` para extraer datos filtrando por rango de fechas de forma eficiente.

**Par√°metros:**
- `nombre_esquema`: Schema a consultar
- `nombre_tabla`: Tabla particionada
- `campo_fecha`: Campo para filtrar (per√≠odo)
- `fecha_inicio`: Fecha inicial (YYYYMMDD)
- `fecha_fin`: Fecha final (YYYYMMDD)

**Ventajas:**
- Explota particiones para ejecutar consultas r√°pidas
- Retorna DataFrame vac√≠o si no hay datos (en lugar de error)
- Manejo robusto de excepciones con logs descriptivos

In [425]:
def leer_rango_particionado(nombre_esquema, nombre_tabla, campo_fecha, fecha_inicio, fecha_fin):
    try:
        metadata = MetaData()
        
        # 1. Reflejar la tabla
        # Aseg√∫rate de que 'engine' est√© definido globalmente
        tabla = Table(
            nombre_tabla, 
            metadata, 
            autoload_with=engine, 
            schema=nombre_esquema
        )

        # 2. Construir la consulta con el rango de fechas
        # Usamos corchetes tabla.c[campo_fecha] para que sea din√°mico
        stmt = select(tabla).where(
            and_(
                tabla.c[campo_fecha] >= fecha_inicio,
                tabla.c[campo_fecha] <= fecha_fin
            )
        )

        # 3. Ejecutar la consulta
        with engine.connect() as conn:
            df_resultado = pd.read_sql(stmt, conn)
        
        if df_resultado.empty:
            print(f"‚ö†Ô∏è La consulta no devolvi√≥ datos para el rango {fecha_inicio} - {fecha_fin}")
            # Retornamos un DF vac√≠o en lugar de None para evitar el error .head()
            return pd.DataFrame() 
            
        return df_resultado

    except Exception as e:
        # Imprimimos el error real para debuggear
        print(f"‚ùå Error detallado al consultar: {str(e)}")
        return pd.DataFrame() # Retornar DF vac√≠o es m√°s seguro que None


## Paso 11: Extracci√≥n de Zona Raw

Obtiene los datos de inversiones desde la tabla particionada en la Zona Raw, filtrando por el per√≠odo definido (noviembre 2025).

**Rango:** 20251101 a 20251130
**Resultado:** DataFrame con inversiones del mes para procesar.

In [426]:
df_inversiones = leer_rango_particionado(
    nombre_esquema=nombre_esquema_zr_pas,
    nombre_tabla=nombre_tabla_zr_pas_inversiones,
    campo_fecha='periodo', # Aseg√∫rate que 'periodo' exista en la tabla
    fecha_inicio=P_FECHA_INCIO,
    fecha_fin=P_FECHA_FIN
)

df_inversiones.head()

Unnamed: 0,fecha_proceso,operacion_id,codigoIdentificacionCliente,monto_apertura,tasa_apertura,monto_actual,interes_dia,estado,es_pignorado,tipo_renovacion,instruccion_renovacion,fecha_renovacion,fecha_apertura,fecha_vencimiento,fecha_cancelacion_real,interes_acumulado_mes,codigoPeriodo,fecha_ingesta,periodo
0,2025-12-01,INV-00000,CUS-00103,133466.77,0.0837,133466.77,31.031,PIGNORADO,True,SIN RENOVACION,,NaT,2025-09-02,2026-08-28,NaT,31.031,,,20251201
1,2025-12-01,INV-00001,CUS-00436,94300.16,0.0755,94300.16,19.7768,VIGENTE,False,SIN RENOVACION,,NaT,2025-09-03,2026-08-29,NaT,19.7768,,,20251201
2,2025-12-01,INV-00002,CUS-00349,38779.12,0.0732,38779.12,7.8851,VIGENTE,False,SIN RENOVACION,,NaT,2025-11-15,2026-05-14,NaT,7.8851,,,20251201
3,2025-12-01,INV-00003,CUS-00271,8538.11,0.0644,8538.11,1.5274,VIGENTE,False,AUTOMATICA,SOLO CAPITAL,2026-05-13,2025-11-14,2026-05-13,NaT,1.5274,,,20251201
4,2025-12-01,INV-00004,CUS-00107,131164.34,0.0698,131164.34,25.4313,VIGENTE,False,POR VENTANILLA,CAPITAL + INTERES,2025-12-09,2025-06-12,2025-12-09,NaT,25.4313,,,20251201


## Paso 12: Transformaci√≥n a Zona Curada (ZC)

Normaliza los datos aplicando reglas de negocio y estandarizaci√≥n de nomenclatura bancaria.

**Transformaciones aplicadas:**

**1. Mapeo de Columnas (Nomenclatura bancaria):**
- `operacion_id` ‚Üí `numeroInversion`
- `fecha_proceso` ‚Üí `fechaProceso`
- `tasa_apertura` ‚Üí `tasaAperturaInversion`
- (Ver diccionario completo en el c√≥digo)

**2. Limpieza de Datos:**
- Elimina espacios en blanco de strings
- Convierte booleanos (True/False) a n√∫meros (1/0)

**3. Transformaciones Num√©ricas:**
- Tasas: multiplica por 100 para convertir a porcentaje
- Redondeo a 2 decimales para evitar artefactos de coma flotante

**4. Transformaciones de Fechas:**
- Convierte a formato DATE (YYYY-MM-DD)
- Maneja NULL en fechas de cancelaci√≥n y renovaci√≥n

**5. Valores por Defecto:**
- Reemplaza 'NA' por 'NO APLICA' en detalles de renovaci√≥n
- Agrega `fechaIngesta` para auditor√≠a

**6. Reordenamiento de Columnas:**
- Establece orden l√≥gico: per√≠odo ‚Üí fecha ‚Üí cliente ‚Üí inversi√≥n ‚Üí montos ‚Üí estados ‚Üí fechas

**Resultado:** DataFrame limpio, normalizado y con nomenclatura est√°ndar bancaria.

In [427]:
# 1. Diccionario de mapeo: {'nombre_actual': 'nuevo_nombre'}
mapping = {
    'fecha_proceso': 'fechaProceso', 
    'operacion_id': 'numeroInversion',
    'codigoIdentificacionCliente': 'codigoIdentificacionCliente',
    'monto_apertura': 'montoAperturaInversion',
    'tasa_apertura': 'tasaAperturaInversion',
    'monto_actual': 'montoActualInversion',
    'interes_dia': 'interesDiaInversion',
    'estado': 'estadoInversion',
    'es_pignorado': 'esPignoradoInversion',
    'tipo_renovacion': 'tipoRenovacionInversion',
    'instruccion_renovacion': 'detalleRenovacionInversion',
    'fecha_renovacion': 'fechaRenovacionInversion',
    'fecha_apertura': 'fechaAperturaInversion',
    'fecha_vencimiento': 'fechaVencimientoInversion',
    'fecha_cancelacion_real': 'fechaCancelacionInversion',
    'interes_acumulado_mes': 'interesAcumuladoMesInversion',
    'periodo': 'periodo'
}

# 2. Aplicamos el cambio
df_inversiones = df_inversiones.rename(columns=mapping)

# 3. Limpiar espacios en blanco adicionales (Best Practice)
df_inversiones['numeroInversion'] = df_inversiones['numeroInversion'].str.strip()
df_inversiones['estadoInversion'] = df_inversiones['estadoInversion'].str.strip()
df_inversiones['tipoRenovacionInversion'] = df_inversiones['tipoRenovacionInversion'].str.strip()
df_inversiones['detalleRenovacionInversion'] = df_inversiones['detalleRenovacionInversion'].str.strip()

# 4. Convertir True/False a 1/0 en la misma columna
df_inversiones['esPignoradoInversion'] = df_inversiones['esPignoradoInversion'].astype(int)

# 5. Multiplicar por 100 para obtener el porcentaje
df_inversiones['tasaAperturaInversion'] = df_inversiones['tasaAperturaInversion'] * 100

# 6. Redondear a 2 o 4 decimales para evitar residuos de coma flotante
df_inversiones['tasaAperturaInversion'] = df_inversiones['tasaAperturaInversion'].round(2)

# 7. Aseguramos que la columna sea de tipo datetime
df_inversiones['fechaProceso'] = pd.to_datetime(df_inversiones['fechaProceso'])

# 8. Creamos la columna 'periodo' en formato yyyyMM
df_inversiones['codigoPeriodo'] = df_inversiones['fechaProceso'].dt.strftime('%Y%m')

# 9. Si prefieres que sea num√©rico en lugar de texto:
df_inversiones['codigoPeriodo'] = df_inversiones['codigoPeriodo'].astype(int)

# 10. Aseguramos que la columna sea de tipo datetime (si a√∫n no lo es)
df_inversiones['fechaCancelacionInversion'] = pd.to_datetime(df_inversiones['fechaCancelacionInversion'])

# 11. Convertimos a formato fecha (YYYY-MM-DD)
# Esto transforma 2025-10-12 00:00:00 en 2025-10-12
df_inversiones['fechaCancelacionInversion'] = df_inversiones['fechaCancelacionInversion'].dt.date

# 12. Asegurar que sea tipo datetime primero
df_inversiones['fechaRenovacionInversion'] = pd.to_datetime(df_inversiones['fechaRenovacionInversion'])

# 13. Reemplazar NaT por None (que se traduce como NULL en PostgreSQL)
#df_inversiones['fechaRenovacionInversion'] = df_inversiones['fechaRenovacionInversion'].replace({pd.NaT: None})
df_inversiones['fechaRenovacionInversion'] = df_inversiones['fechaRenovacionInversion'].dt.date

# 14. Reemplazar NA por NO APLICA
df_inversiones['detalleRenovacionInversion'] = df_inversiones['detalleRenovacionInversion'].replace(
    ['NA', 'nan', 'None', 'NAN', 'nan', '', ' '], np.nan
)
df_inversiones['detalleRenovacionInversion'] = df_inversiones['detalleRenovacionInversion'].fillna('NO APLICA')

# 11. Campo auditoria fechaIngesta: El timestamp completo con hora, min, seg y microsegundos
df_inversiones['fechaIngesta'] = fechaActual

# 12. Ordenar columnas
columnas_finales = [
    'codigoPeriodo',
    'fechaProceso', 
    'numeroInversion',
    'codigoIdentificacionCliente',
    'montoAperturaInversion',
    'tasaAperturaInversion',
    'montoActualInversion',
    'interesDiaInversion',
    'estadoInversion',
    'esPignoradoInversion',
    'tipoRenovacionInversion',
    'detalleRenovacionInversion',
    'fechaRenovacionInversion',
    'fechaAperturaInversion',
    'fechaVencimientoInversion',
    'fechaCancelacionInversion',
    'interesAcumuladoMesInversion',
    'periodo',
    'fechaIngesta'
]

# 13. Reordenamos y filtramos el DataFrame
df_inversiones = df_inversiones[columnas_finales]

df_inversiones.head()

Unnamed: 0,codigoPeriodo,fechaProceso,numeroInversion,codigoIdentificacionCliente,montoAperturaInversion,tasaAperturaInversion,montoActualInversion,interesDiaInversion,estadoInversion,esPignoradoInversion,tipoRenovacionInversion,detalleRenovacionInversion,fechaRenovacionInversion,fechaAperturaInversion,fechaVencimientoInversion,fechaCancelacionInversion,interesAcumuladoMesInversion,periodo,fechaIngesta
0,202512,2025-12-01,INV-00000,CUS-00103,133466.77,8.37,133466.77,31.031,PIGNORADO,1,SIN RENOVACION,NO APLICA,NaT,2025-09-02,2026-08-28,NaT,31.031,20251201,2026-02-21 19:39:11.519180
1,202512,2025-12-01,INV-00001,CUS-00436,94300.16,7.55,94300.16,19.7768,VIGENTE,0,SIN RENOVACION,NO APLICA,NaT,2025-09-03,2026-08-29,NaT,19.7768,20251201,2026-02-21 19:39:11.519180
2,202512,2025-12-01,INV-00002,CUS-00349,38779.12,7.32,38779.12,7.8851,VIGENTE,0,SIN RENOVACION,NO APLICA,NaT,2025-11-15,2026-05-14,NaT,7.8851,20251201,2026-02-21 19:39:11.519180
3,202512,2025-12-01,INV-00003,CUS-00271,8538.11,6.44,8538.11,1.5274,VIGENTE,0,AUTOMATICA,SOLO CAPITAL,2026-05-13,2025-11-14,2026-05-13,NaT,1.5274,20251201,2026-02-21 19:39:11.519180
4,202512,2025-12-01,INV-00004,CUS-00107,131164.34,6.98,131164.34,25.4313,VIGENTE,0,POR VENTANILLA,CAPITAL + INTERES,2025-12-09,2025-06-12,2025-12-09,NaT,25.4313,20251201,2026-02-21 19:39:11.519180


## Paso 13: Exportaci√≥n a CSV - Zona Curada

Guarda los datos transformados en archivo CSV.

**Destino:** `data/curada/zc_pas_inversiones.csv`

**Prop√≥sito:**
- Respaldo de datos curados
- Verificaci√≥n manual de transformaciones
- Punto de referencia para auditor√≠a

In [428]:
# Guardar clientes en los archivos de Zona Curada
df_inversiones.to_csv(f'../data/curada/{nombre_tabla_zc_pas_inversiones}.csv', index=False)
print("‚úÖ Archivo generado exitosamente en Zona curada.")


‚úÖ Archivo generado exitosamente en Zona curada.


## Paso 14: Carga a PostgreSQL - Zona Curada

Inserta los datos transformados en la tabla particionada `zc_pas.zc_pas_inversiones`.

**Tabla destino:** `zc_pas.zc_pas_inversiones`
**Particionamiento:** Por per√≠odo (YYYYMMDD)
**Clave Primaria:** (numeroInversion, periodo)

Esta es la zona de an√°lisis intermedia con datos ya normalizados.

In [429]:
# Ejecutar carga a Zona Curada
cargar_datos_historicos_postgresql(df_inversiones,nombre_esquema_zc_pas,nombre_tabla_zc_pas_inversiones,'periodo','numeroInversion')
print("‚úÖ Inversiones cargada exitosamente en PostgreSQL.")

üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251201
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251202
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251203
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251204
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251205
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251206
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251207
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251208
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251209
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251210
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251211
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251212
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251213
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251214
üßπ Truncando partici√≥n existente: zc_pas_inversiones_p20251215
üßπ Trunc

## Paso 15: Consolidaci√≥n - Inversiones + Clientes (TN)

Realiza un JOIN entre datos de inversiones y maestro de clientes para crear la tabla final productiva.

**Proceso:**
1. **Extrae inversiones de ZC:** Lee datos curados del per√≠odo de an√°lisis
2. **Prepara dimensi√≥n de clientes:** Selecciona 10 columnas clave del maestro
   - Identificaci√≥n (c√≥digo, tipo, n√∫mero)
   - Informaci√≥n personal (nombre)
   - Informaci√≥n crediticia (segmento, score)
   - Informaci√≥n de ubicaci√≥n (provincia, ciudad)
   - Fecha de registro
3. **Realiza JOIN:** Une inversiones con clientes por `codigoIdentificacionCliente`
   - Tipo: LEFT JOIN (conserva todas las inversiones)
   - Maneja colisiones de nombres con sufijo `_master`
4. **Selecciona columnas finales:** Reordena y filtra 25 columnas esenciales
5. **Agrega auditor√≠a:** Timestamp de ingesta

**Resultado:** `df_consolidado` con datos de inversi√≥n + contexto de cliente (forma anal√≠tica).

In [430]:
df_zc_inversiones = leer_rango_particionado(
    nombre_esquema=nombre_esquema_zc_pas,
    nombre_tabla=nombre_tabla_zc_pas_inversiones,
    campo_fecha='periodo', # Aseg√∫rate que 'periodo' exista en la tabla
    fecha_inicio=P_FECHA_INCIO,
    fecha_fin=P_FECHA_FIN
)

columnas_dimension = [
    'codigoSecuencialCliente',
    'codigoIdentificacionCliente',
    'tipoIdentificacionCliente',
    'numeroIdentificacionCliente',
    'nombreCompletoCliente',
    'segmentoCliente',
    'scoreCrediticioCliente',
    'provinciaCliente',
    'ciudadCliente',
    'fechaRegistroCliente'
]

df_clientes = df_clientes[columnas_dimension]

df_consolidado = pd.merge(
    df_zc_inversiones, 
    df_clientes, 
    on='codigoIdentificacionCliente', 
    how='left',
    suffixes=('', '_master') # Evita colisi√≥n si hay nombres de columnas iguales
)

# Verificar el resultado
print(f"Dimensiones del dataframe cruzado: {df_consolidado.shape}")

columnas_finales = [
    'codigoPeriodo',
    'fechaProceso',
    'codigoIdentificacionCliente',
    'tipoIdentificacionCliente',
    'numeroIdentificacionCliente',
    'nombreCompletoCliente',
    'segmentoCliente',
    'scoreCrediticioCliente',
    'provinciaCliente',
    'ciudadCliente',
    'fechaRegistroCliente',
    'numeroInversion',
    'montoAperturaInversion',
    'tasaAperturaInversion',
    'montoActualInversion',
    'interesDiaInversion',
    'estadoInversion',
    'esPignoradoInversion',
    'tipoRenovacionInversion',
    'detalleRenovacionInversion',
    'fechaRenovacionInversion',
    'fechaAperturaInversion',
    'fechaVencimientoInversion',
    'fechaCancelacionInversion',
    'interesAcumuladoMesInversion',
    'periodo'
]

df_consolidado = df_consolidado[columnas_finales]
df_consolidado['fechaIngesta'] = fechaActual
df_consolidado.head()


Dimensiones del dataframe cruzado: (27900, 28)


Unnamed: 0,codigoPeriodo,fechaProceso,codigoIdentificacionCliente,tipoIdentificacionCliente,numeroIdentificacionCliente,nombreCompletoCliente,segmentoCliente,scoreCrediticioCliente,provinciaCliente,ciudadCliente,...,esPignoradoInversion,tipoRenovacionInversion,detalleRenovacionInversion,fechaRenovacionInversion,fechaAperturaInversion,fechaVencimientoInversion,fechaCancelacionInversion,interesAcumuladoMesInversion,periodo,fechaIngesta
0,202512,2025-12-01,CUS-00103,C√âDULA,725858681,DR. EVA CURIEL,PYME,785,AZUAY,QUITO,...,1,SIN RENOVACION,NO APLICA,,2025-09-02,2026-08-28,,31.031,20251201,2026-02-21 19:39:11.519180
1,202512,2025-12-01,CUS-00436,C√âDULA,6924368178,NOEM√ç CASTELLANOS LINARES,WEALTH,961,LOJA,QUITO,...,0,SIN RENOVACION,NO APLICA,,2025-09-03,2026-08-29,,19.7768,20251201,2026-02-21 19:39:11.519180
2,202512,2025-12-01,CUS-00349,C√âDULA,628584636,CATALINA PILAR BALDERAS VILLA,WEALTH,824,MANAB√ç,GUAYAQUIL,...,0,SIN RENOVACION,NO APLICA,,2025-11-15,2026-05-14,,7.8851,20251201,2026-02-21 19:39:11.519180
3,202512,2025-12-01,CUS-00271,RUC,671785488001,PATRICIA ARANDA,RETAIL,523,GUAYAS,MANTA,...,0,AUTOMATICA,SOLO CAPITAL,2026-05-13,2025-11-14,2026-05-13,,1.5274,20251201,2026-02-21 19:39:11.519180
4,202512,2025-12-01,CUS-00107,C√âDULA,1254782922,ESTELA FELICIANO,WEALTH,717,GUAYAS,MANTA,...,0,POR VENTANILLA,CAPITAL + INTERES,2025-12-09,2025-06-12,2025-12-09,,25.4313,20251201,2026-02-21 19:39:11.519180


## Paso 16: Exportaci√≥n a CSV - Zona Productiva

Guarda los datos finales consolidados en archivo CSV.

**Destino:** `data/productiva/tn_pas_inversiones.csv`

**Contenido:** 
Dataset completo con informaci√≥n de inversiones + clientes (tabla anal√≠tica).

**Prop√≥sito:**
- Dataset final para an√°lisis y reportes
- Base para dashboards de inversiones
- Auditor√≠a de datos entregados

In [431]:
# Guardar clientes en los archivos de Zona Raw
df_consolidado.to_csv(f'../data/productiva/{nombre_tabla_zp_pas_inversiones}.csv', index=False)
print("‚úÖ Archivo generado exitosamente en Zona productiva.")

‚úÖ Archivo generado exitosamente en Zona productiva.


## Paso 17: Carga a PostgreSQL - Zona Productiva

Inserta el dataset final consolidado en la tabla `zp.tn_pas_inversiones` para consultas anal√≠ticas.

**Tabla destino:** `zp.tn_pas_inversiones` (Tabla de Hechos de Inversiones)
**Particionamiento:** Por per√≠odo (YYYYMMDD)
**Registros:** ~16,500 (inversiones √ó d√≠as)
**Dimensionalidad:** Incluye 10 columnas de cliente + 15 de inversi√≥n

Esta es la tabla consultable para:
- Reportes ejecutivos
- An√°lisis de rentabilidad
- Seguimiento de pignoraciones
- Control de renovaciones

In [432]:
# Ejecutar carga a Zona Productiva
cargar_datos_historicos_postgresql(df_consolidado,nombre_esquema_zp,nombre_tabla_zp_pas_inversiones,'periodo','numeroInversion')
print("‚úÖ Inversiones cargada exitosamente en PostgreSQL.")

üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251201
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251202
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251203
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251204
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251205
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251206
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251207
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251208
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251209
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251210
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251211
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251212
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251213
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251214
üßπ Truncando partici√≥n existente: tn_pas_inversiones_p20251215
üßπ Trunc

## Paso 18: Limpieza y Liberaci√≥n de Recursos

Cierra conexiones de base de datos y libera memoria del kernel de Jupyter.

**Acciones:**
1. **Cierra conexiones PostgreSQL:**
   - Busca objetos con m√©todo `.close()` (conexiones DBAPI)
   - Busca objetos con m√©todo `.dispose()` (SQLAlchemy engines)

2. **Libera DataFrames:**
   - Identifica todos los DataFrames en memoria
   - Los elimina para liberar RAM

3. **Recolecci√≥n de basura:**
   - Ejecuta `gc.collect()` para optimizar memoria

**Prop√≥sito:** 
- Evitar memory leaks en Jupyter
- Permitir nuevas ejecuciones del notebook sin problemas
- Liberar conexiones a PostgreSQL para otros procesos

**Resultado:** "üßπ Memoria RAM optimizada"

In [433]:
print("--- Iniciando limpieza de recursos ---")

# 1. Cerrar conexiones a Bases de Datos
# Intenta cerrar objetos comunes de conexi√≥n si existen en el entorno global
possible_conn_names = ['conn', 'connection', 'db_conn', 'engine']

for name in possible_conn_names:
    if name in globals():
        obj = globals()[name]
        try:
            # Para conexiones tipo DBAPI (sqlite3, psycopg2, etc.)
            if hasattr(obj, 'close'):
                obj.close()
                print(f"Conexi√≥n '{name}' cerrada.")
            # Para engines de SQLAlchemy
            elif hasattr(obj, 'dispose'):
                obj.dispose()
                print(f"Engine '{name}' dispuesto.")
        except Exception as e:
            print(f"No se pudo cerrar '{name}': {e}")

print("üîå Conexi√≥n a PostgreSQL cerrada correctamente.")


# 2. Eliminar DataFrames para liberar memoria
# Busca todas las variables globales que sean DataFrames de pandas (y que no empiecen con _)
df_variables = [var for var, obj in globals().items() 
                if isinstance(obj, pd.DataFrame) and not var.startswith('_')]

if df_variables:
    print(f"Eliminando {len(df_variables)} DataFrames: {', '.join(df_variables)}")
    for var in df_variables:
        del globals()[var]
    print(f"üóëÔ∏è DataFrames eliminados de la memoria.")
else:
    print("No se encontraron DataFrames para eliminar.")

# 3. Forzar la recolecci√≥n de basura del sistema
gc.collect()
print("Recolecci√≥n de basura completada.")
print("üßπ Memoria RAM optimizada.")
print("--- Limpieza finalizada ---")


--- Iniciando limpieza de recursos ---
Conexi√≥n 'conn' cerrada.
Engine 'engine' dispuesto.
üîå Conexi√≥n a PostgreSQL cerrada correctamente.
Eliminando 5 DataFrames: df_clientes, df_fact_inversiones, df_inversiones, df_zc_inversiones, df_consolidado
üóëÔ∏è DataFrames eliminados de la memoria.
Recolecci√≥n de basura completada.
üßπ Memoria RAM optimizada.
--- Limpieza finalizada ---
