# Proceso ETL: Una Guía Práctica con Python

## 📚 Objetivos de Aprendizaje

Al completar este notebook, los estudiantes serán capaces de:
- Comprender los conceptos fundamentales de ETL (Extract, Transform, Load)
- Implementar cada fase del proceso ETL usando Python y pandas
- Aplicar técnicas de limpieza y transformación de datos
- Manejar errores comunes en pipelines de datos
- Diseñar pipelines ETL escalables y mantenibles

## 🎯 ¿Qué es ETL?

**ETL** es un proceso fundamental en ingeniería de datos que permite:
- **Extract (Extraer)**: Obtener datos de múltiples fuentes
- **Transform (Transformar)**: Limpiar, validar y estructurar los datos
- **Load (Cargar)**: Almacenar los datos procesados en el destino final

Este notebook es una guía completa que cubre desde conceptos básicos hasta temas avanzados, con ejemplos prácticos y ejercicios para trabajar con pipelines ETL en entornos reales de ingeniería de datos.

## El Pipeline ETL de Referencia

Nos basaremos en la siguiente estructura de pipeline, que representa un flujo ETL clásico:

In [21]:
def etl_pipeline():
    # 1. Extraer datos desde una fuente original
    raw_data = extract_from_database()
    
    # 2. Transformar los datos aplicando reglas y limpiando
    cleaned_data = apply_business_rules(raw_data)
    
    # 3. Normalizar el esquema para que sea consistente
    structured_data = normalize_schema(cleaned_data)
    
    # 4. Cargar los datos transformados a su destino final
    load_to_warehouse(structured_data)

A continuación, implementaremos cada una de estas funciones con ejemplos claros.

## 📦 Librerías y Configuración Inicial

Para nuestros ejemplos, usaremos las siguientes librerías esenciales en ingeniería de datos:

In [None]:
import pandas as pd          # Manipulación y análisis de datos
import sqlite3              # Base de datos SQL ligera
import numpy as np          # Operaciones numéricas
import json                 # Manejo de datos JSON
import logging              # Sistema de logs
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configuración de logging para monitorear el pipeline
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## 🧠 Conceptos Fundamentales de ETL

### Tipos de Procesamiento ETL

1. **Batch Processing (Procesamiento por lotes)**
   - Procesa grandes volúmenes de datos en intervalos programados
   - Ideal para reportes diarios, semanales o mensuales
   - Ejemplo: Procesar todas las transacciones del día anterior

2. **Real-time Processing (Procesamiento en tiempo real)**
   - Procesa datos tan pronto como llegan
   - Crítico para aplicaciones que requieren respuesta inmediata
   - Ejemplo: Detección de fraude en transacciones

3. **Near Real-time Processing (Procesamiento casi en tiempo real)**
   - Procesa datos con un pequeño retraso (segundos o minutos)
   - Balance entre velocidad y eficiencia de recursos
   - Ejemplo: Actualización de dashboards cada 5 minutos

### Calidad de Datos - Las 6 Dimensiones

1. **Exactitud**: Los datos reflejan la realidad
2. **Completitud**: No hay valores faltantes críticos
3. **Consistencia**: Los datos son coherentes entre sistemas
4. **Validez**: Los datos cumplen con reglas de negocio
5. **Unicidad**: No hay duplicados no deseados
6. **Actualidad**: Los datos están actualizados

## 1. Extract: Extracción de Datos

### 🎯 Objetivos de la Fase Extract
- Conectar con múltiples fuentes de datos
- Extraer datos de manera eficiente
- Manejar diferentes formatos y protocolos
- Implementar estrategias de extracción incremental

### 📊 Fuentes de Datos Comunes
- **Bases de datos relacionales**: MySQL, PostgreSQL, SQL Server
- **Bases de datos NoSQL**: MongoDB, Cassandra, Redis
- **Archivos**: CSV, JSON, XML, Parquet
- **APIs REST**: Servicios web y microservicios
- **Sistemas de streaming**: Kafka, Kinesis

**Ejemplo:** Vamos a simular la extracción de datos de una base de datos SQL. Crearemos una pequeña base de datos en memoria con `sqlite3` y luego la leeremos usando `pandas`.

In [36]:
def extract_from_database():
    """
    Simula la extracción de datos desde una base de datos.
    En un entorno real, esto se conectaría a una base de datos real.
    
    Returns:
        pd.DataFrame: Datos crudos extraídos
    """
    try:
        logger.info("Iniciando extracción de datos...")
        
        # Simulamos datos con problemas típicos de calidad
        data = {
            'ID_USER': [101, 102, 103, 104, 105, 106],
            'user_name': ['Ana', 'Luis', 'Marta', 'Juan', 'Eva', None],  # Valor nulo
            'registration_date': ['2025-01-15', '2025-02-20', '2025-03-01', '2025-04-10', '2025-05-19', '2025-06-30'],
            'total_spent': [150.5, 80.0, -999, 200.0, 45.25, 'invalid'],  # Valor inválido
            'country_code': ['ES', 'MX', 'ES', 'CO', 'es', 'ES'],  # Inconsistencia en mayúsculas
            'email': ['ana@email.com', 'luis@email.com', 'marta@email.com', 'juan@email.com', 'eva@email.com', 'pedro@email.com']
        }
        
        df = pd.DataFrame(data)
        
        print("--- 1. Datos Crudos Extraídos ---")
        print(f"Registros extraídos: {len(df)}")
        print(f"Columnas: {list(df.columns)}")
        print("\nPrimeras filas:")
        print(df)
        print("\nInformación del DataFrame:")
        print(df.info())
        print('')
        
        logger.info(f"Extracción completada: {len(df)} registros")
        return df
        
    except Exception as e:
        logger.error(f"Error en la extracción: {str(e)}")
        raise

### ▶️ Ejecutemos la Extracción

Ahora vamos a ejecutar la función de extracción para ver los datos:

In [24]:
# Ejecutar la función de extracción
raw_data = extract_from_database()

2025-08-31 15:46:34,049 - INFO - Iniciando extracción de datos...
2025-08-31 15:46:34,052 - INFO - Extracción completada: 6 registros


--- 1. Datos Crudos Extraídos ---
Registros extraídos: 6
Columnas: ['ID_USER', 'user_name', 'registration_date', 'total_spent', 'country_code', 'email']

Primeras filas:
   ID_USER user_name registration_date total_spent country_code  \
0      101       Ana        2025-01-15       150.5           ES   
1      102      Luis        2025-02-20        80.0           MX   
2      103     Marta        2025-03-01        -999           ES   
3      104      Juan        2025-04-10       200.0           CO   
4      105       Eva        2025-05-19       45.25           es   
5      106      None        2025-06-30     invalid           ES   

             email  
0    ana@email.com  
1   luis@email.com  
2  marta@email.com  
3   juan@email.com  
4    eva@email.com  
5  pedro@email.com  

Información del DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------

### 📁 Ejemplo: Extracción desde Archivo CSV

Veamos cómo extraer datos desde un archivo CSV (otra fuente común):

In [37]:
def extract_from_csv(file_path='sample_products.csv'):
    """
    Extrae datos desde un archivo CSV.
    
    Args:
        file_path (str): Ruta del archivo CSV
    
    Returns:
        pd.DataFrame: Datos extraídos del CSV
    """
    try:
        # Crear un archivo CSV de ejemplo
        sample_data = {
            'product_id': [1, 2, 3, 4, 5],
            'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
            'price': [999.99, 25.50, 75.00, 299.99, 89.99],
            'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories']
        }
        
        df_sample = pd.DataFrame(sample_data)
        df_sample.to_csv(file_path, index=False)
        
        # Leer el archivo CSV
        df = pd.read_csv(file_path)
        
        print("--- Extracción desde CSV ---")
        print(df)
        return df
        
    except FileNotFoundError:
        logger.error(f"Archivo no encontrado: {file_path}")
        return pd.DataFrame()
    except Exception as e:
        logger.error(f"Error leyendo CSV: {str(e)}")
        return pd.DataFrame()

# Ejemplo de uso
products_df = extract_from_csv()

--- Extracción desde CSV ---
   product_id product_name   price     category
0           1       Laptop  999.99  Electronics
1           2        Mouse   25.50  Accessories
2           3     Keyboard   75.00  Accessories
3           4      Monitor  299.99  Electronics
4           5   Headphones   89.99  Accessories


## 2. Transform: Aplicación de Reglas de Negocio

### 🎯 Objetivos de la Fase Transform
- Limpiar datos inconsistentes o erróneos
- Validar que los datos cumplan reglas de negocio
- Enriquecer datos con información adicional
- Aplicar transformaciones matemáticas o lógicas
- Normalizar formatos y estructuras

### 🔧 Técnicas Comunes de Transformación
- **Limpieza**: Eliminar duplicados, corregir errores tipográficos
- **Validación**: Verificar rangos, formatos, tipos de datos
- **Enriquecimiento**: Agregar datos calculados o de referencia
- **Normalización**: Estandarizar formatos y escalas
- **Agregación**: Resumir datos por grupos o períodos

**Ejemplo:** Aplicaremos las siguientes reglas:
1. Corregir valores erróneos: El valor `-999` en `total_spent` debe ser 0
2. Corregir inconsistencias: El código de país `es` debe ser `ES`
3. Filtrar datos: Solo usuarios con gasto mayor a 50
4. Manejar valores nulos en nombres de usuario

In [38]:
def apply_business_rules(raw_data):
    """
    Aplica reglas de negocio y limpieza de datos.
    
    Args:
        raw_data (pd.DataFrame): Datos crudos a transformar
    
    Returns:
        pd.DataFrame: Datos limpios y transformados
    """
    try:
        logger.info("Iniciando transformación de datos...")
        cleaned_data = raw_data.copy()
        
        print("--- Análisis de Calidad de Datos ---")
        print(f"Registros iniciales: {len(cleaned_data)}")
        print(f"Valores nulos por columna:")
        print(cleaned_data.isnull().sum())
        print()
        
        # 1. Manejar valores nulos en user_name
        null_names = cleaned_data['user_name'].isnull().sum()
        if null_names > 0:
            print(f"⚠️  Encontrados {null_names} nombres de usuario nulos")
            cleaned_data['user_name'] = cleaned_data['user_name'].fillna('Usuario_Desconocido')
        
        # 2. Corregir valores erróneos en total_spent
        # Convertir valores no numéricos a NaN, luego reemplazar -999 por 0
        cleaned_data['total_spent'] = pd.to_numeric(cleaned_data['total_spent'], errors='coerce')
        invalid_spent = cleaned_data['total_spent'].isnull().sum()
        if invalid_spent > 0:
            print(f"⚠️  Encontrados {invalid_spent} valores inválidos en total_spent")
            cleaned_data['total_spent'] = cleaned_data['total_spent'].fillna(0)
        
        cleaned_data['total_spent'] = cleaned_data['total_spent'].replace(-999, 0)
        
        # 3. Normalizar códigos de país
        cleaned_data['country_code'] = cleaned_data['country_code'].str.upper()
        
        # 4. Aplicar regla de negocio: solo usuarios con gasto > 50
        initial_count = len(cleaned_data)
        cleaned_data = cleaned_data[cleaned_data['total_spent'] > 50]
        filtered_count = initial_count - len(cleaned_data)
        
        print(f"📊 Registros filtrados (gasto <= 50): {filtered_count}")
        print(f"📊 Registros finales: {len(cleaned_data)}")
        
        print("\n--- 2. Datos Limpios (Reglas Aplicadas) ---")
        print(cleaned_data)
        print('')
        
        logger.info(f"Transformación completada: {len(cleaned_data)} registros válidos")
        return cleaned_data
        
    except Exception as e:
        logger.error(f"Error en la transformación: {str(e)}")
        raise

### 🔍 Técnicas Avanzadas de Transformación

Veamos algunas técnicas adicionales de transformación:

In [39]:
def advanced_transformations(df):
    """
    Aplica transformaciones avanzadas a los datos.
    
    Args:
        df (pd.DataFrame): DataFrame a transformar
    
    Returns:
        pd.DataFrame: DataFrame con transformaciones aplicadas
    """
    df_transformed = df.copy()
    
    # 1. Crear columnas derivadas
    df_transformed['registration_year'] = pd.to_datetime(df_transformed['registration_date']).dt.year
    df_transformed['registration_month'] = pd.to_datetime(df_transformed['registration_date']).dt.month
    
    # 2. Categorizar gastos
    def categorize_spending(amount):
        if amount < 100:
            return 'Bajo'
        elif amount < 200:
            return 'Medio'
        else:
            return 'Alto'
    
    df_transformed['spending_category'] = df_transformed['total_spent'].apply(categorize_spending)
    
    # 3. Validar emails (ejemplo básico)
    if 'email' in df_transformed.columns:
        df_transformed['email_valid'] = df_transformed['email'].str.contains('@', na=False)
    
    print("--- Transformaciones Avanzadas ---")
    print(df_transformed[['user_name', 'total_spent', 'spending_category', 'registration_year']].head())
    
    return df_transformed

# Esta función se puede usar después de apply_business_rules
# df_advanced = advanced_transformations(cleaned_data)

## 3. Transform: Normalización del Esquema

### 🎯 Objetivos de la Normalización
- Estandarizar nombres de columnas
- Asegurar tipos de datos correctos
- Mantener consistencia entre sistemas
- Facilitar la integración con el destino

El objetivo es asegurar que el esquema de los datos (nombres de columnas, tipos de datos) sea consistente.

**Ejemplo:** Vamos a:
1. Cambiar nombres de columnas a un formato estándar (snake_case)
2. Asegurar que `registration_date` sea de tipo fecha
3. Reordenar columnas según el esquema del destino

In [40]:
def normalize_schema(cleaned_data):
    """
    Normaliza el esquema de datos según estándares del data warehouse.
    
    Args:
        cleaned_data (pd.DataFrame): Datos limpios a normalizar
    
    Returns:
        pd.DataFrame: Datos con esquema normalizado
    """
    try:
        logger.info("Iniciando normalización del esquema...")
        structured_data = cleaned_data.copy()
        
        print("--- Análisis del Esquema Original ---")
        print(f"Columnas originales: {list(structured_data.columns)}")
        print(f"Tipos de datos originales:")
        print(structured_data.dtypes)
        print()
        
        # 1. Normalizar nombres de columnas (snake_case)
        column_mapping = {
            'ID_USER': 'user_id',
            'user_name': 'user_name',
            'registration_date': 'registration_date',
            'total_spent': 'total_spent',
            'country_code': 'country_code',
            'email': 'email_address'
        }
        
        # Solo renombrar columnas que existen
        existing_columns = {k: v for k, v in column_mapping.items() if k in structured_data.columns}
        structured_data = structured_data.rename(columns=existing_columns)
        
        # 2. Convertir tipos de datos
        structured_data['registration_date'] = pd.to_datetime(structured_data['registration_date'])
        structured_data['user_id'] = structured_data['user_id'].astype('int64')
        structured_data['total_spent'] = structured_data['total_spent'].astype('float64')
        
        # 3. Agregar metadatos de procesamiento
        structured_data['processed_at'] = datetime.now()
        structured_data['data_source'] = 'user_database'
        
        # 4. Reordenar columnas según esquema del data warehouse
        column_order = ['user_id', 'user_name', 'email_address', 'country_code', 
                       'registration_date', 'total_spent', 'processed_at', 'data_source']
        
        # Solo incluir columnas que existen
        available_columns = [col for col in column_order if col in structured_data.columns]
        structured_data = structured_data[available_columns]
        
        print("--- 3. Datos Estructurados (Esquema Normalizado) ---")
        print(f"Columnas finales: {list(structured_data.columns)}")
        print(structured_data)
        print("\nInformación del esquema final:")
        print(structured_data.info())
        print('')
        
        logger.info(f"Normalización completada: {len(structured_data)} registros estructurados")
        return structured_data
        
    except Exception as e:
        logger.error(f"Error en la normalización: {str(e)}")
        raise

## 4. Load: Carga de Datos

### 🎯 Objetivos de la Fase Load
- Cargar datos en el sistema de destino
- Mantener integridad referencial
- Optimizar el rendimiento de carga
- Implementar estrategias de carga incremental

### 📊 Estrategias de Carga
- **Full Load**: Carga completa de todos los datos
- **Incremental Load**: Solo datos nuevos o modificados
- **Upsert**: Insertar nuevos registros, actualizar existentes
- **SCD (Slowly Changing Dimensions)**: Manejo de cambios históricos

La fase final es cargar los datos transformados en el sistema de destino (Data Warehouse, Data Lake, etc.).

**Ejemplo:** Simularemos diferentes tipos de carga:

In [41]:
def load_to_warehouse(structured_data, load_type='full'):
    """
    Carga datos al data warehouse con diferentes estrategias.
    
    Args:
        structured_data (pd.DataFrame): Datos estructurados a cargar
        load_type (str): Tipo de carga ('full', 'incremental', 'upsert')
    
    Returns:
        bool: True si la carga fue exitosa
    """
    try:
        logger.info(f"Iniciando carga de datos - Tipo: {load_type}")
        
        # Validaciones pre-carga
        if structured_data.empty:
            logger.warning("No hay datos para cargar")
            return False
        
        print("--- Validaciones Pre-Carga ---")
        print(f"Registros a cargar: {len(structured_data)}")
        print(f"Columnas: {list(structured_data.columns)}")
        
        # Verificar duplicados
        duplicates = structured_data.duplicated(subset=['user_id']).sum()
        if duplicates > 0:
            print(f"⚠️  Encontrados {duplicates} registros duplicados")
            structured_data = structured_data.drop_duplicates(subset=['user_id'], keep='last')
        
        # Simular diferentes tipos de carga
        if load_type == 'full':
            # Carga completa - reemplaza todos los datos
            filename = 'data_warehouse_users_full.csv'
            structured_data.to_csv(filename, index=False)
            print(f"✅ Carga completa realizada: {filename}")
            
        elif load_type == 'incremental':
            # Carga incremental - solo nuevos registros
            filename = 'data_warehouse_users_incremental.csv'
            # En un caso real, verificaríamos qué registros ya existen
            structured_data.to_csv(filename, mode='a', header=False, index=False)
            print(f"✅ Carga incremental realizada: {filename}")
            
        elif load_type == 'upsert':
            # Upsert - insertar nuevos, actualizar existentes
            filename = 'data_warehouse_users_upsert.csv'
            structured_data.to_csv(filename, index=False)
            print(f"✅ Upsert realizado: {filename}")
        
        print("\n--- 4. Datos Cargados ---")
        print(f"Registros cargados exitosamente: {len(structured_data)}")
        print("\nPrimeras filas del archivo de destino:")
        
        # Mostrar contenido del archivo
        with open(filename, 'r') as f:
            lines = f.readlines()[:6]  # Mostrar solo las primeras 6 líneas
            print(''.join(lines))
        
        # Estadísticas de carga
        print("\n--- Estadísticas de Carga ---")
        print(f"📊 Total de registros: {len(structured_data)}")
        print(f"📊 Países únicos: {structured_data['country_code'].nunique()}")
        print(f"📊 Gasto promedio: ${structured_data['total_spent'].mean():.2f}")
        print(f"📊 Gasto total: ${structured_data['total_spent'].sum():.2f}")
        
        logger.info(f"Carga completada exitosamente: {len(structured_data)} registros")
        return True
        
    except Exception as e:
        logger.error(f"Error en la carga: {str(e)}")
        return False

## Ejecución del Pipeline Completo

Finalmente, orquestamos todas las funciones en el pipeline principal.

In [42]:
def etl_pipeline_with_monitoring():
    """
    Pipeline ETL completo con monitoreo y manejo de errores.
    """
    start_time = datetime.now()
    
    try:
        print("🚀 Iniciando Pipeline ETL")
        print(f"⏰ Hora de inicio: {start_time}")
        print("=" * 50)
        
        # Extract
        raw_data = extract_from_database()
        
        # Transform
        cleaned_data = apply_business_rules(raw_data)
        
        # Normalize
        structured_data = normalize_schema(cleaned_data)
        
        # Load
        success = load_to_warehouse(structured_data, load_type='full')
        
        # Métricas finales
        end_time = datetime.now()
        duration = end_time - start_time
        
        print("\n" + "=" * 50)
        print("📊 RESUMEN DEL PIPELINE ETL")
        print("=" * 50)
        print(f"✅ Estado: {'EXITOSO' if success else 'FALLIDO'}")
        print(f"⏱️  Duración total: {duration.total_seconds():.2f} segundos")
        print(f"📥 Registros extraídos: {len(raw_data)}")
        print(f"🔄 Registros procesados: {len(cleaned_data)}")
        print(f"📤 Registros cargados: {len(structured_data)}")
        print(f"📉 Tasa de filtrado: {((len(raw_data) - len(structured_data)) / len(raw_data) * 100):.1f}%")
        
        return structured_data
        
    except Exception as e:
        logger.error(f"Pipeline falló: {str(e)}")
        print(f"❌ Error en el pipeline: {str(e)}")
        return None

# Ejecutar el pipeline completo
result = etl_pipeline_with_monitoring()

2025-08-31 16:07:09,243 - INFO - Iniciando extracción de datos...
2025-08-31 16:07:09,247 - INFO - Extracción completada: 6 registros
2025-08-31 16:07:09,248 - INFO - Iniciando transformación de datos...
2025-08-31 16:07:09,251 - INFO - Transformación completada: 3 registros válidos
2025-08-31 16:07:09,252 - INFO - Iniciando normalización del esquema...
2025-08-31 16:07:09,257 - INFO - Normalización completada: 3 registros estructurados
2025-08-31 16:07:09,257 - INFO - Iniciando carga de datos - Tipo: full
2025-08-31 16:07:09,259 - INFO - Carga completada exitosamente: 3 registros


🚀 Iniciando Pipeline ETL
⏰ Hora de inicio: 2025-08-31 16:07:09.243120
--- 1. Datos Crudos Extraídos ---
Registros extraídos: 6
Columnas: ['ID_USER', 'user_name', 'registration_date', 'total_spent', 'country_code', 'email']

Primeras filas:
   ID_USER user_name registration_date total_spent country_code  \
0      101       Ana        2025-01-15       150.5           ES   
1      102      Luis        2025-02-20        80.0           MX   
2      103     Marta        2025-03-01        -999           ES   
3      104      Juan        2025-04-10       200.0           CO   
4      105       Eva        2025-05-19       45.25           es   
5      106      None        2025-06-30     invalid           ES   

             email  
0    ana@email.com  
1   luis@email.com  
2  marta@email.com  
3   juan@email.com  
4    eva@email.com  
5  pedro@email.com  

Información del DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column    

## 🔧 Ejecución Paso a Paso

Si quieres ver cada paso por separado, ejecuta las siguientes celdas una por una:

In [43]:
# Paso 1: Extracción
print("=== PASO 1: EXTRACCIÓN ===")
raw_data_step = extract_from_database()

2025-08-31 16:07:40,174 - INFO - Iniciando extracción de datos...
2025-08-31 16:07:40,180 - INFO - Extracción completada: 6 registros


=== PASO 1: EXTRACCIÓN ===
--- 1. Datos Crudos Extraídos ---
Registros extraídos: 6
Columnas: ['ID_USER', 'user_name', 'registration_date', 'total_spent', 'country_code', 'email']

Primeras filas:
   ID_USER user_name registration_date total_spent country_code  \
0      101       Ana        2025-01-15       150.5           ES   
1      102      Luis        2025-02-20        80.0           MX   
2      103     Marta        2025-03-01        -999           ES   
3      104      Juan        2025-04-10       200.0           CO   
4      105       Eva        2025-05-19       45.25           es   
5      106      None        2025-06-30     invalid           ES   

             email  
0    ana@email.com  
1   luis@email.com  
2  marta@email.com  
3   juan@email.com  
4    eva@email.com  
5  pedro@email.com  

Información del DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column             Non-Null Count  Dtype 
---  ------

In [44]:
# Paso 2: Transformación
print("=== PASO 2: TRANSFORMACIÓN ===")
cleaned_data_step = apply_business_rules(raw_data_step)

2025-08-31 16:07:48,604 - INFO - Iniciando transformación de datos...
2025-08-31 16:07:48,607 - INFO - Transformación completada: 3 registros válidos


=== PASO 2: TRANSFORMACIÓN ===
--- Análisis de Calidad de Datos ---
Registros iniciales: 6
Valores nulos por columna:
ID_USER              0
user_name            1
registration_date    0
total_spent          0
country_code         0
email                0
dtype: int64

⚠️  Encontrados 1 nombres de usuario nulos
⚠️  Encontrados 1 valores inválidos en total_spent
📊 Registros filtrados (gasto <= 50): 3
📊 Registros finales: 3

--- 2. Datos Limpios (Reglas Aplicadas) ---
   ID_USER user_name registration_date  total_spent country_code  \
0      101       Ana        2025-01-15        150.5           ES   
1      102      Luis        2025-02-20         80.0           MX   
3      104      Juan        2025-04-10        200.0           CO   

            email  
0   ana@email.com  
1  luis@email.com  
3  juan@email.com  



In [45]:
# Paso 3: Normalización
print("=== PASO 3: NORMALIZACIÓN ===")
structured_data_step = normalize_schema(cleaned_data_step)

2025-08-31 16:07:56,773 - INFO - Iniciando normalización del esquema...
2025-08-31 16:07:56,779 - INFO - Normalización completada: 3 registros estructurados


=== PASO 3: NORMALIZACIÓN ===
--- Análisis del Esquema Original ---
Columnas originales: ['ID_USER', 'user_name', 'registration_date', 'total_spent', 'country_code', 'email']
Tipos de datos originales:
ID_USER                int64
user_name             object
registration_date     object
total_spent          float64
country_code          object
email                 object
dtype: object

--- 3. Datos Estructurados (Esquema Normalizado) ---
Columnas finales: ['user_id', 'user_name', 'email_address', 'country_code', 'registration_date', 'total_spent', 'processed_at', 'data_source']
   user_id user_name   email_address country_code registration_date  \
0      101       Ana   ana@email.com           ES        2025-01-15   
1      102      Luis  luis@email.com           MX        2025-02-20   
3      104      Juan  juan@email.com           CO        2025-04-10   

   total_spent               processed_at    data_source  
0        150.5 2025-08-31 16:07:56.775963  user_database  
1         

In [46]:
# Paso 4: Carga
print("=== PASO 4: CARGA ===")
success = load_to_warehouse(structured_data_step, load_type='full')
print(f"\n✅ Pipeline completado exitosamente: {success}")

2025-08-31 16:08:02,755 - INFO - Iniciando carga de datos - Tipo: full
2025-08-31 16:08:02,759 - INFO - Carga completada exitosamente: 3 registros


=== PASO 4: CARGA ===
--- Validaciones Pre-Carga ---
Registros a cargar: 3
Columnas: ['user_id', 'user_name', 'email_address', 'country_code', 'registration_date', 'total_spent', 'processed_at', 'data_source']
✅ Carga completa realizada: data_warehouse_users_full.csv

--- 4. Datos Cargados ---
Registros cargados exitosamente: 3

Primeras filas del archivo de destino:
user_id,user_name,email_address,country_code,registration_date,total_spent,processed_at,data_source
101,Ana,ana@email.com,ES,2025-01-15,150.5,2025-08-31 16:07:56.775963,user_database
102,Luis,luis@email.com,MX,2025-02-20,80.0,2025-08-31 16:07:56.775963,user_database
104,Juan,juan@email.com,CO,2025-04-10,200.0,2025-08-31 16:07:56.775963,user_database


--- Estadísticas de Carga ---
📊 Total de registros: 3
📊 Países únicos: 3
📊 Gasto promedio: $143.50
📊 Gasto total: $430.50

✅ Pipeline completado exitosamente: True
