In [1]:
import logging
import time
from functools import wraps

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_ecommerce.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('etl_ecommerce')

def log_etapa(etapa):
    """Decorator para logging de etapas"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger.info(f"Iniciando {etapa}")
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"{etapa} completada en {duration:.2f}s")
                return result
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"{etapa} falló en {duration:.2f}s: {e}")
                raise e
        
        return wrapper
    return decorator

In [2]:
import pandas as pd
import numpy as np
from typing import Dict, Any

class ETLPipeline:
    def __init__(self):
        self.logger = logger
        self.errores = []
    
    @log_etapa("extracción de datos")
    def extract(self) -> pd.DataFrame:
        """Extraer datos con manejo de errores"""
        try:
            # Simular extracción (podría fallar)
            if np.random.random() < 0.1:  # 10% chance de error
                raise ConnectionError("Error de conexión a fuente de datos")
            
            # Datos de ejemplo
            datos = pd.DataFrame({
                'orden_id': range(1, 101),
                'cliente_id': np.random.randint(1, 21, 100),
                'producto': np.random.choice(['A', 'B', 'C', 'D'], 100),
                'cantidad': np.random.randint(1, 6, 100),
                'precio': np.round(np.random.uniform(10, 200, 100), 2)
            })
            
            self.logger.info(f"Extraídos {len(datos)} registros")
            return datos
            
        except Exception as e:
            self.errores.append(f"Extract: {e}")
            raise e
    
    @log_etapa("transformación de datos")
    def transform(self, datos: pd.DataFrame) -> pd.DataFrame:
        """Transformar datos con validaciones"""
        try:
            df = datos.copy()
            
            # Validar datos de entrada
            if df.empty:
                raise ValueError("No hay datos para transformar")
            
            # Transformaciones
            df['total'] = df['cantidad'] * df['precio']
            df['categoria_precio'] = pd.cut(
                df['precio'], 
                bins=[0, 50, 100, 200], 
                labels=['Bajo', 'Medio', 'Alto']
            )
            
            # Validar transformaciones
            if df['total'].isnull().any():
                raise ValueError("Transformación produjo valores nulos")
            
            self.logger.info(f"Transformados {len(df)} registros")
            return df
            
        except Exception as e:
            self.errores.append(f"Transform: {e}")
            raise e
    
    @log_etapa("carga de datos")
    def load(self, datos: pd.DataFrame) -> bool:
        """Cargar datos con verificación"""
        try:
            # Simular carga (podría fallar)
            if np.random.random() < 0.05:  # 5% chance de error
                raise Exception("Error de conexión a base de datos")
            
            # En producción: datos.to_sql('ventas', engine, if_exists='append')
            self.logger.info(f"Cargados {len(datos)} registros exitosamente")
            
            # Validar carga
            registros_esperados = len(datos)
            registros_cargados = len(datos)  # Simulado
            
            if registros_cargados != registros_esperados:
                raise ValueError(f"Carga incompleta: {registros_cargados}/{registros_esperados}")
            
            return True
            
        except Exception as e:
            self.errores.append(f"Load: {e}")
            raise e
    
    def ejecutar_pipeline(self) -> Dict[str, Any]:
        """Ejecutar pipeline completo con manejo de errores"""
        self.logger.info("Iniciando pipeline ETL completo")
        
        try:
            # Extract
            datos_crudo = self.extract()
            
            # Transform
            datos_transformados = self.transform(datos_crudo)
            
            # Load
            exito = self.load(datos_transformados)
            
            resultado = {
                'exito': True,
                'registros_procesados': len(datos_transformados),
                'errores': self.errores
            }
            
            self.logger.info("Pipeline ETL completado exitosamente")
            return resultado
            
        except Exception as e:
            self.logger.error(f"Pipeline ETL falló: {e}")
            
            return {
                'exito': False,
                'error_principal': str(e),
                'errores': self.errores
            }

In [4]:
# Ejecutar pipeline con diferentes escenarios
pipeline = ETLPipeline()

# Ejecución exitosa
resultado = pipeline.ejecutar_pipeline()

print(f"\nResultado del pipeline:")
print(f"Éxito: {resultado['exito']}")
if resultado['exito']:
    print(f"Registros procesados: {resultado['registros_procesados']}")
else:
    print(f"Error principal: {resultado['error_principal']}")

print(f"Errores registrados: {len(resultado['errores'])}")
for error in resultado['errores']:
    print(f"  - {error}")

# Ejecutar múltiples veces para probar robustez
resultados_multiples = []
for i in range(5):
    print(f"\n--- Ejecución {i+1} ---")
    pipeline_i = ETLPipeline()
    resultado_i = pipeline_i.ejecutar_pipeline()
    resultados_multiples.append(resultado_i['exito'])

exito_rate = sum(resultados_multiples) / len(resultados_multiples)
print(f"Tasa de éxito: {exito_rate:.1%}")

2026-01-08 20:37:54,797 - INFO - Iniciando pipeline ETL completo
2026-01-08 20:37:54,799 - INFO - Iniciando extracción de datos
2026-01-08 20:37:54,803 - INFO - Extraídos 100 registros
2026-01-08 20:37:54,805 - INFO - extracción de datos completada en 0.00s
2026-01-08 20:37:54,807 - INFO - Iniciando transformación de datos
2026-01-08 20:37:54,814 - INFO - Transformados 100 registros
2026-01-08 20:37:54,817 - INFO - transformación de datos completada en 0.01s
2026-01-08 20:37:54,820 - INFO - Iniciando carga de datos
2026-01-08 20:37:54,823 - INFO - Cargados 100 registros exitosamente
2026-01-08 20:37:54,825 - INFO - carga de datos completada en 0.00s
2026-01-08 20:37:54,826 - INFO - Pipeline ETL completado exitosamente
2026-01-08 20:37:54,831 - INFO - Iniciando pipeline ETL completo
2026-01-08 20:37:54,833 - INFO - Iniciando extracción de datos
2026-01-08 20:37:54,835 - ERROR - extracción de datos falló en 0.00s: Error de conexión a fuente de datos
2026-01-08 20:37:54,836 - ERROR - Pipe


Resultado del pipeline:
Éxito: True
Registros procesados: 100
Errores registrados: 0

--- Ejecución 1 ---

--- Ejecución 2 ---

--- Ejecución 3 ---

--- Ejecución 4 ---

--- Ejecución 5 ---
Tasa de éxito: 40.0%


In [5]:
import pandas as pd
import numpy as np
import logging
import random
from datetime import datetime

# ----------------------------
# Configuración de logging
# ----------------------------
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("etl_pipeline")

# ----------------------------
# Pipeline ETL
# ----------------------------
class ETLPipeline:

    def extract(self):
        logger.info("Iniciando extracción de datos")
        if random.random() < 0.2:
            raise Exception("Error de conexión a fuente de datos")

        df = pd.DataFrame({
            "orden_id": range(1, 101),
            "cliente_id": np.random.randint(1, 20, 100),
            "producto": np.random.choice(list("ABCD"), 100),
            "cantidad": np.random.randint(1, 6, 100),
            "precio": np.round(np.random.uniform(10, 200, 100), 2)
        })

        logger.info(f"Extraídos {len(df)} registros")
        return df

    def transform(self, df):
        logger.info("Iniciando transformación de datos")
        df = df.copy()
        df["total"] = df["cantidad"] * df["precio"]
        df["categoria_precio"] = df["total"].apply(
            lambda x: "Alto" if x >= 200 else "Bajo"
        )
        logger.info(f"Transformados {len(df)} registros")
        return df

    def load(self, df):
        logger.info("Iniciando carga de datos")
        if random.random() < 0.2:
            raise Exception("Error de conexión a base de datos")
        logger.info(f"Cargados {len(df)} registros exitosamente")

    def run(self):
        df = self.extract()
        df = self.transform(df)
        self.load(df)
        return df

# ----------------------------
# Ejecuciones del pipeline
# ----------------------------
pipeline = ETLPipeline()
ejecuciones = []
df_final = None

N_EJECUCIONES = 5

for i in range(1, N_EJECUCIONES + 1):
    try:
        df_final = pipeline.run()
        ejecuciones.append({
            "ejecucion": i,
            "timestamp": datetime.now(),
            "registros_procesados": len(df_final),
            "estado": "Éxito",
            "etapa_error": None,
            "detalle_error": None
        })
    except Exception as e:
        ejecuciones.append({
            "ejecucion": i,
            "timestamp": datetime.now(),
            "registros_procesados": 0,
            "estado": "Fallido",
            "etapa_error": "ETL",
            "detalle_error": str(e)
        })

df_ejecuciones = pd.DataFrame(ejecuciones)

# ----------------------------
# Resumen del ETL
# ----------------------------
total = len(df_ejecuciones)
exitos = (df_ejecuciones["estado"] == "Éxito").sum()
fallos = total - exitos
tasa_exito = round(exitos / total * 100, 2)

df_resumen = pd.DataFrame([{
    "total_ejecuciones": total,
    "ejecuciones_exitosas": exitos,
    "ejecuciones_fallidas": fallos,
    "tasa_exito_%": tasa_exito
}])

# ----------------------------
# Exportar evidencia a Excel
# ----------------------------
with pd.ExcelWriter("evidencia_etl_dia5.xlsx") as writer:
    if df_final is not None:
        df_final.to_excel(writer, sheet_name="datos_transformados", index=False)
    df_ejecuciones.to_excel(writer, sheet_name="ejecuciones_pipeline", index=False)
    df_resumen.to_excel(writer, sheet_name="resumen_etl", index=False)

print("✅ Evidencia generada: evidencia_etl_dia5.xlsx")
df_resumen

2026-01-08 20:41:50,640 - INFO - Iniciando extracción de datos
2026-01-08 20:41:50,642 - INFO - Iniciando extracción de datos
2026-01-08 20:41:50,646 - INFO - Extraídos 100 registros
2026-01-08 20:41:50,648 - INFO - Iniciando transformación de datos
2026-01-08 20:41:50,657 - INFO - Transformados 100 registros
2026-01-08 20:41:50,660 - INFO - Iniciando carga de datos
2026-01-08 20:41:50,661 - INFO - Cargados 100 registros exitosamente
2026-01-08 20:41:50,662 - INFO - Iniciando extracción de datos
2026-01-08 20:41:50,667 - INFO - Extraídos 100 registros
2026-01-08 20:41:50,671 - INFO - Iniciando transformación de datos
2026-01-08 20:41:50,676 - INFO - Transformados 100 registros
2026-01-08 20:41:50,678 - INFO - Iniciando carga de datos
2026-01-08 20:41:50,679 - INFO - Cargados 100 registros exitosamente
2026-01-08 20:41:50,681 - INFO - Iniciando extracción de datos
2026-01-08 20:41:50,684 - INFO - Extraídos 100 registros
2026-01-08 20:41:50,686 - INFO - Iniciando transformación de datos


✅ Evidencia generada: evidencia_etl_dia5.xlsx


Unnamed: 0,total_ejecuciones,ejecuciones_exitosas,ejecuciones_fallidas,tasa_exito_%
0,5,2,3,40.0
