In [1]:
# Setup
import sys
from pathlib import Path
import importlib

# Agregar directorio ra√≠z al path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

print(f"‚úÖ Project root: {project_root}")

‚úÖ Project root: D:\Academico\SENATI\octavo_ciclo\appBigData


In [2]:
# Imports y recargar m√≥dulos
from sqlalchemy import create_engine, text
from config.config import Config
from src.logger_config import get_logger
import pandas as pd

# Recargar m√≥dulos para tomar cambios
import src.validator
import src.staging

importlib.reload(src.validator)
importlib.reload(src.staging)

from src.validator import DataValidator
from src.staging import StagingProcessor

logger = get_logger('notebook_staging')
logger.info("üöÄ Iniciando proceso de staging y validaciones")

‚úÖ Configuraci√≥n cargada correctamente
[32m04:34:44 | notebook_staging | INFO[0m | üöÄ Iniciando proceso de staging y validaciones[0m


In [3]:
# Obtener √∫ltimo ETL ID
engine_staging = create_engine(Config.get_staging_connection_string())

query_etl = text("""
    SELECT etl_id, proceso, fecha_fin, estado
    FROM etl_control
    WHERE proceso LIKE 'EXTRACCION%'
    ORDER BY etl_id DESC
    LIMIT 1
""")

with engine_staging.connect() as conn:
    result = conn.execute(query_etl)
    ultimo_etl = result.fetchone()
    
    if ultimo_etl:
        etl_id = ultimo_etl[0]
        print(f"üìù Usando ETL ID: {etl_id}")
        print(f"   Proceso: {ultimo_etl[1]}")
        print(f"   Estado: {ultimo_etl[3]}")
    else:
        print("‚ö†Ô∏è  No hay ETL previo, usando ID None")
        etl_id = None

üìù Usando ETL ID: 4
   Proceso: EXTRACCION_COMPLETA
   Estado: COMPLETADO


In [4]:
# Inicializar validador y procesador
validator = DataValidator(etl_id=etl_id)
processor = StagingProcessor(etl_id=etl_id)

logger.info("‚úÖ Validador y procesador inicializados")

[32m04:34:47 | validator | INFO[0m | ‚úÖ Validador inicializado correctamente[0m
[32m04:34:47 | staging | INFO[0m | ‚úÖ Procesador de staging inicializado[0m
[32m04:34:47 | notebook_staging | INFO[0m | ‚úÖ Validador y procesador inicializados[0m


In [5]:
# FASE 1 - Validaciones PRE-limpieza
"""
Primero validamos los datos crudos para documentar el estado inicial
"""
print("\n" + "="*80)
print("üîç FASE 1: VALIDACIONES PRE-LIMPIEZA")
print("="*80)

resultados_pre = validator.ejecutar_validaciones_staging()

# Mostrar resumen
print("\nüìä Resumen de validaciones PRE-limpieza:")
for validacion, resultado in resultados_pre.items():
    status = "‚úÖ" if resultado else "‚ùå"
    print(f"{status} {validacion}")

tasa_exito_pre = sum(resultados_pre.values()) / len(resultados_pre) * 100
print(f"\nüìà Tasa de √©xito: {tasa_exito_pre:.1f}%")


üîç FASE 1: VALIDACIONES PRE-LIMPIEZA
[32m04:34:50 | validator | INFO[0m | üöÄ INICIO: VALIDACIONES[0m
[32m04:34:50 | validator | INFO[0m |    Validando calidad de datos en staging[0m
[32m04:34:50 | validator | INFO[0m | üîç Validando stg_rental...[0m
[32m04:34:50 | validator | INFO[0m | ‚úÖ Sin duplicados en stg_rental[0m
[32m04:34:50 | validator | INFO[0m | ‚úÖ Todas las columnas requeridas completas en stg_rental[0m
[32m04:34:50 | validator | INFO[0m | üîç Validando stg_payment...[0m
[32m04:34:50 | validator | INFO[0m | ‚úÖ Sin duplicados en stg_payment[0m
[32m04:34:51 | validator | INFO[0m | ‚úÖ Todas las columnas requeridas completas en stg_payment[0m
[32m04:34:51 | validator | INFO[0m | ‚úÖ Valores en rango v√°lido: stg_payment.amount[0m
[32m04:34:51 | validator | INFO[0m | üîç Validando stg_film...[0m
[32m04:34:51 | validator | INFO[0m | ‚úÖ Sin duplicados en stg_film[0m
[32m04:34:51 | validator | INFO[0m | ‚úÖ Valores en rango v√°lido: s

In [6]:
#  Agregar columnas de validaci√≥n a tablas staging
from sqlalchemy import text

print("\n" + "="*80)
print("üîß PREPARACI√ìN: Agregar columnas de validaci√≥n")
print("="*80)

tablas_con_validacion = ['stg_rental', 'stg_payment', 'stg_film']

with engine_staging.connect() as conn:
    for tabla in tablas_con_validacion:
        try:
            # Agregar columna es_valido
            query1 = text(f"""
                ALTER TABLE {tabla}
                ADD COLUMN es_valido BOOLEAN DEFAULT TRUE
            """)
            conn.execute(query1)
            conn.commit()
            print(f"‚úÖ Agregada columna es_valido a {tabla}")
        except Exception as e:
            if '1060' in str(e):  # Columna ya existe
                print(f"‚ö†Ô∏è  {tabla}.es_valido ya existe")
            else:
                print(f"‚ùå Error en {tabla}.es_valido: {e}")
        
        try:
            # Agregar columna mensaje_validacion
            query2 = text(f"""
                ALTER TABLE {tabla}
                ADD COLUMN mensaje_validacion VARCHAR(255) NULL
            """)
            conn.execute(query2)
            conn.commit()
            print(f"‚úÖ Agregada columna mensaje_validacion a {tabla}")
        except Exception as e:
            if '1060' in str(e):  # Columna ya existe
                print(f"‚ö†Ô∏è  {tabla}.mensaje_validacion ya existe")
            else:
                print(f"‚ùå Error en {tabla}.mensaje_validacion: {e}")

print("\n‚úÖ Columnas de validaci√≥n agregadas")


üîß PREPARACI√ìN: Agregar columnas de validaci√≥n
‚úÖ Agregada columna es_valido a stg_rental
‚úÖ Agregada columna mensaje_validacion a stg_rental
‚úÖ Agregada columna es_valido a stg_payment
‚úÖ Agregada columna mensaje_validacion a stg_payment
‚úÖ Agregada columna es_valido a stg_film
‚úÖ Agregada columna mensaje_validacion a stg_film

‚úÖ Columnas de validaci√≥n agregadas


In [7]:
# FASE 2 - Limpieza y Transformaciones
"""
Procesamos las tablas para limpiar y transformar datos
"""
print("\n" + "="*80)
print("üîß FASE 2: LIMPIEZA Y TRANSFORMACIONES")
print("="*80)

stats_procesamiento = processor.procesar_todas_las_tablas()

# Mostrar estad√≠sticas
print("\nüìä Estad√≠sticas de procesamiento:")
for tabla, stats in stats_procesamiento.items():
    print(f"\n{tabla}:")
    for metric, valor in stats.items():
        if valor > 0:
            print(f"   {metric}: {valor}")


üîß FASE 2: LIMPIEZA Y TRANSFORMACIONES
[32m04:35:03 | staging | INFO[0m | üöÄ INICIO: PROCESAMIENTO_STAGING[0m
[32m04:35:03 | staging | INFO[0m |    Limpieza y transformaciones en staging[0m
[32m04:35:03 | staging | INFO[0m | üîß Procesando stg_rental...[0m
[32m04:38:21 | staging | INFO[0m | ‚úÖ Sin duplicados en stg_rental[0m
[32m04:38:21 | staging | INFO[0m | ‚úÖ stg_rental procesado: {'duplicados': 0, 'fecha_invalida': 0, 'return_invalido': 0}[0m
[32m04:38:21 | staging | INFO[0m | üîß Procesando stg_payment...[0m
[32m04:41:37 | staging | INFO[0m | ‚úÖ Sin duplicados en stg_payment[0m
[32m04:41:37 | staging | INFO[0m | ‚úÖ stg_payment procesado: {'duplicados': 0, 'monto_negativo': 0, 'monto_excesivo': 0, 'fecha_invalida': 0}[0m
[32m04:41:37 | staging | INFO[0m | üîß Procesando stg_film...[0m
[32m04:41:38 | staging | INFO[0m | ‚úÖ Sin duplicados en stg_film[0m
[32m04:41:38 | staging | INFO[0m | ‚úÖ stg_film procesado: {'duplicados': 0, 'normaliza

In [8]:
# FASE 3 - Validaciones POST-limpieza
"""
Validamos nuevamente para verificar mejoras
"""
print("\n" + "="*80)
print("üîç FASE 3: VALIDACIONES POST-LIMPIEZA")
print("="*80)

resultados_post = validator.ejecutar_validaciones_staging()

# Mostrar resumen
print("\nüìä Resumen de validaciones POST-limpieza:")
for validacion, resultado in resultados_post.items():
    status = "‚úÖ" if resultado else "‚ùå"
    cambio = ""
    if validacion in resultados_pre:
        if resultado and not resultados_pre[validacion]:
            cambio = " (MEJOR√ì ‚Üë)"
        elif not resultado and resultados_pre[validacion]:
            cambio = " (EMPEOR√ì ‚Üì)"
    print(f"{status} {validacion}{cambio}")

tasa_exito_post = sum(resultados_post.values()) / len(resultados_post) * 100
print(f"\nüìà Tasa de √©xito POST: {tasa_exito_post:.1f}%")
print(f"üìà Mejora: {tasa_exito_post - tasa_exito_pre:+.1f}%")


üîç FASE 3: VALIDACIONES POST-LIMPIEZA
[32m04:42:48 | validator | INFO[0m | üöÄ INICIO: VALIDACIONES[0m
[32m04:42:48 | validator | INFO[0m |    Validando calidad de datos en staging[0m
[32m04:42:48 | validator | INFO[0m | üîç Validando stg_rental...[0m
[32m04:42:48 | validator | INFO[0m | ‚úÖ Sin duplicados en stg_rental[0m
[32m04:42:49 | validator | INFO[0m | ‚úÖ Todas las columnas requeridas completas en stg_rental[0m
[32m04:42:49 | validator | INFO[0m | üîç Validando stg_payment...[0m
[32m04:42:49 | validator | INFO[0m | ‚úÖ Sin duplicados en stg_payment[0m
[32m04:42:49 | validator | INFO[0m | ‚úÖ Todas las columnas requeridas completas en stg_payment[0m
[32m04:42:49 | validator | INFO[0m | ‚úÖ Valores en rango v√°lido: stg_payment.amount[0m
[32m04:42:49 | validator | INFO[0m | üîç Validando stg_film...[0m
[32m04:42:49 | validator | INFO[0m | ‚úÖ Sin duplicados en stg_film[0m
[32m04:42:49 | validator | INFO[0m | ‚úÖ Valores en rango v√°lido: 

In [9]:
# Ver registros inv√°lidos en detalle
"""
Examinar qu√© registros fueron marcados como inv√°lidos
"""
print("\n" + "="*80)
print("üìã REGISTROS INV√ÅLIDOS DETECTADOS")
print("="*80)

# Rental inv√°lidos
query_rental_inv = """
    SELECT rental_id, rental_date, return_date, mensaje_validacion
    FROM stg_rental
    WHERE es_valido = FALSE
    LIMIT 10
"""

df_rental_inv = pd.read_sql(query_rental_inv, engine_staging)
if len(df_rental_inv) > 0:
    print(f"\n‚ö†Ô∏è  stg_rental - {len(df_rental_inv)} registros inv√°lidos (mostrando 10):")
    display(df_rental_inv)
else:
    print("\n‚úÖ stg_rental - Todos los registros son v√°lidos")

# Payment inv√°lidos
query_payment_inv = """
    SELECT payment_id, amount, payment_date, mensaje_validacion
    FROM stg_payment
    WHERE es_valido = FALSE
    LIMIT 10
"""

df_payment_inv = pd.read_sql(query_payment_inv, engine_staging)
if len(df_payment_inv) > 0:
    print(f"\n‚ö†Ô∏è  stg_payment - {len(df_payment_inv)} registros inv√°lidos (mostrando 10):")
    display(df_payment_inv)
else:
    print("\n‚úÖ stg_payment - Todos los registros son v√°lidos")


üìã REGISTROS INV√ÅLIDOS DETECTADOS

‚úÖ stg_rental - Todos los registros son v√°lidos

‚úÖ stg_payment - Todos los registros son v√°lidos


In [10]:
# Ver auditor√≠a de calidad
"""
Revisar todas las validaciones registradas en audit_calidad
"""
query_audit = text("""
    SELECT 
        validacion,
        resultado,
        tabla_origen,
        COUNT(*) as veces
    FROM audit_calidad
    WHERE etl_id = :etl_id
    GROUP BY validacion, resultado, tabla_origen
    ORDER BY resultado DESC, validacion
""")

with engine_staging.connect() as conn:
    df_audit = pd.read_sql(query_audit, conn, params={"etl_id": etl_id})

print("\nüìã Auditor√≠a de Calidad:")
display(df_audit)


üìã Auditor√≠a de Calidad:


Unnamed: 0,validacion,resultado,tabla_origen,veces
0,Duplicados PK,PASS,stg_rental,3
1,Duplicados PK,PASS,stg_payment,3
2,Duplicados PK,PASS,stg_film,3
3,Integridad Referencial,PASS,stg_rental,3
4,Integridad Referencial,PASS,stg_inventory,3
5,Integridad Referencial,PASS,stg_store,3
6,Nulos en amount,PASS,stg_payment,3
7,Nulos en customer_id,PASS,stg_rental,3
8,Nulos en customer_id,PASS,stg_payment,3
9,Nulos en inventory_id,PASS,stg_rental,3


In [11]:
# Obtener datos limpios para siguiente fase
"""
Extraer solo registros v√°lidos para la fase de transformaci√≥n
"""
print("\n" + "="*80)
print("üì¶ DATOS LIMPIOS LISTOS PARA TRANSFORMACI√ìN")
print("="*80)

# Contar registros v√°lidos por tabla
tablas_staging = ['stg_rental', 'stg_payment', 'stg_film', 'stg_inventory',
                 'stg_category', 'stg_store', 'stg_city', 'stg_country']

print("\nüìä Registros v√°lidos por tabla:")
for tabla in tablas_staging:
    df_validos = processor.obtener_registros_validos(tabla)
    print(f"   {tabla}: {len(df_validos):,} registros")


üì¶ DATOS LIMPIOS LISTOS PARA TRANSFORMACI√ìN

üìä Registros v√°lidos por tabla:
[32m04:43:18 | staging | INFO[0m | üìä 16,044 registros v√°lidos en stg_rental[0m
   stg_rental: 16,044 registros
[32m04:43:18 | staging | INFO[0m | üìä 16,044 registros v√°lidos en stg_payment[0m
   stg_payment: 16,044 registros
[32m04:43:18 | staging | INFO[0m | üìä 1,000 registros v√°lidos en stg_film[0m
   stg_film: 1,000 registros
[32m04:43:18 | staging | INFO[0m | üìä 4,581 registros v√°lidos en stg_inventory[0m
   stg_inventory: 4,581 registros
[32m04:43:18 | staging | INFO[0m | üìä 16 registros v√°lidos en stg_category[0m
   stg_category: 16 registros
[32m04:43:18 | staging | INFO[0m | üìä 2 registros v√°lidos en stg_store[0m
   stg_store: 2 registros
[32m04:43:18 | staging | INFO[0m | üìä 600 registros v√°lidos en stg_city[0m
   stg_city: 600 registros
[32m04:43:18 | staging | INFO[0m | üìä 109 registros v√°lidos en stg_country[0m
   stg_country: 109 registros


In [12]:
# Cerrar conexiones
validator.cerrar_conexion()
processor.cerrar_conexion()
logger.info("‚úÖ Proceso de staging completado")

print("\n" + "="*80)
print("üéâ ¬°STAGING Y VALIDACIONES COMPLETADAS!")
print("="*80)
print("\nüìå Siguiente paso: 03_transformacion.ipynb (Crear modelo estrella)")

[32m04:43:29 | validator | INFO[0m | üîå Conexi√≥n cerrada[0m
[32m04:43:29 | staging | INFO[0m | üîå Conexi√≥n cerrada[0m
[32m04:43:29 | notebook_staging | INFO[0m | ‚úÖ Proceso de staging completado[0m

üéâ ¬°STAGING Y VALIDACIONES COMPLETADAS!

üìå Siguiente paso: 03_transformacion.ipynb (Crear modelo estrella)
