# 🔄 Proceso ETL - Dataset Olist hacia MongoDB

## Extracción, Transformación y Carga de datos de e-commerce brasileño

Este notebook implementa un proceso ETL completo para:
1. **Extraer** datos de archivos CSV del dataset Olist
2. **Transformar** y limpiar los datos para análisis
3. **Cargar** los datos procesados en MongoDB

---

In [6]:
# Importar librerías necesarias
import pandas as pd
import numpy as np
import pymongo
from pymongo import MongoClient
from datetime import datetime, timedelta
import warnings
import json
import os
from typing import Dict, List, Any
import logging

# Configuraciones
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)

# Configurar logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✅ Librerías importadas exitosamente")
print(f"📅 Proceso ETL iniciado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

✅ Librerías importadas exitosamente
📅 Proceso ETL iniciado: 2025-08-04 14:51:18


## 1. 📥 EXTRACCIÓN (Extract)

Carga de datos desde archivos CSV del dataset Olist

In [7]:
def extract_data() -> Dict[str, pd.DataFrame]:
    """
    Extrae todos los archivos CSV del dataset Olist
    
    Returns:
        Dict[str, pd.DataFrame]: Diccionario con todos los datasets cargados
    """
    logger.info("🔄 Iniciando extracción de datos...")
    
    # Definir archivos y sus nombres de colección en MongoDB
    files_config = {
        'customers': 'olist_customers_dataset.csv',
        'geolocation': 'olist_geolocation_dataset.csv',
        'order_items': 'olist_order_items_dataset.csv',
        'order_payments': 'olist_order_payments_dataset.csv',
        'order_reviews': 'olist_order_reviews_dataset.csv',
        'orders': 'olist_orders_dataset.csv',
        'products': 'olist_products_dataset.csv',
        'sellers': 'olist_sellers_dataset.csv',
        'product_categories': 'product_category_name_translation.csv'
    }
    
    datasets = {}
    total_rows = 0
    
    for dataset_name, filename in files_config.items():
        try:
            file_path = f'data/{filename}'
            df = pd.read_csv(file_path)
            datasets[dataset_name] = df
            total_rows += len(df)
            logger.info(f"✅ {dataset_name}: {len(df):,} filas cargadas")
            
        except FileNotFoundError:
            logger.error(f"❌ Archivo no encontrado: {file_path}")
        except Exception as e:
            logger.error(f"❌ Error cargando {filename}: {str(e)}")
    
    logger.info(f"📊 Extracción completada: {len(datasets)} datasets, {total_rows:,} filas totales")
    return datasets

# Ejecutar extracción
raw_data = extract_data()

2025-08-04 14:51:21,349 - INFO - 🔄 Iniciando extracción de datos...
2025-08-04 14:51:21,760 - INFO - ✅ customers: 99,441 filas cargadas
2025-08-04 14:51:21,760 - INFO - ✅ customers: 99,441 filas cargadas
2025-08-04 14:51:23,433 - INFO - ✅ geolocation: 1,000,163 filas cargadas
2025-08-04 14:51:23,433 - INFO - ✅ geolocation: 1,000,163 filas cargadas
2025-08-04 14:51:24,016 - INFO - ✅ order_items: 112,650 filas cargadas
2025-08-04 14:51:24,016 - INFO - ✅ order_items: 112,650 filas cargadas
2025-08-04 14:51:24,289 - INFO - ✅ order_payments: 103,886 filas cargadas
2025-08-04 14:51:24,289 - INFO - ✅ order_payments: 103,886 filas cargadas
2025-08-04 14:51:25,071 - INFO - ✅ order_reviews: 99,224 filas cargadas
2025-08-04 14:51:25,071 - INFO - ✅ order_reviews: 99,224 filas cargadas
2025-08-04 14:51:25,889 - INFO - ✅ orders: 99,441 filas cargadas
2025-08-04 14:51:25,889 - INFO - ✅ orders: 99,441 filas cargadas
2025-08-04 14:51:25,983 - INFO - ✅ products: 32,951 filas cargadas
2025-08-04 14:51:25

## 2. 🔧 TRANSFORMACIÓN (Transform)

Limpieza, validación y transformación de datos para optimizar el almacenamiento en MongoDB

In [8]:
def transform_dates(df: pd.DataFrame, date_columns: List[str]) -> pd.DataFrame:
    """
    Convierte columnas de fecha a formato datetime y maneja valores nulos
    """
    df_transformed = df.copy()
    
    for col in date_columns:
        if col in df_transformed.columns:
            df_transformed[col] = pd.to_datetime(df_transformed[col], errors='coerce')
            
    return df_transformed

def transform_customers(customers_df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma dataset de clientes
    """
    logger.info("🔄 Transformando datos de clientes...")
    
    df = customers_df.copy()
    
    # Limpiar datos de ubicación
    df['customer_city'] = df['customer_city'].str.lower().str.strip()
    df['customer_state'] = df['customer_state'].str.upper().str.strip()
    
    # Agregar campos derivados
    df['customer_location'] = df['customer_city'] + ', ' + df['customer_state']
    
    # Validar códigos postales (CEP brasileño)
    df['customer_zip_code_prefix'] = df['customer_zip_code_prefix'].astype(str).str.zfill(5)
    
    logger.info(f"✅ Clientes transformados: {len(df):,} registros")
    return df

def transform_orders(orders_df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma dataset de órdenes
    """
    logger.info("🔄 Transformando datos de órdenes...")
    
    df = orders_df.copy()
    
    # Convertir fechas
    date_columns = ['order_purchase_timestamp', 'order_approved_at', 
                   'order_delivered_carrier_date', 'order_delivered_customer_date',
                   'order_estimated_delivery_date']
    
    df = transform_dates(df, date_columns)
    
    # Agregar campos derivados
    df['order_year'] = df['order_purchase_timestamp'].dt.year
    df['order_month'] = df['order_purchase_timestamp'].dt.month
    df['order_weekday'] = df['order_purchase_timestamp'].dt.dayofweek
    df['order_hour'] = df['order_purchase_timestamp'].dt.hour
    
    # Calcular tiempos de entrega
    df['days_to_deliver'] = (df['order_delivered_customer_date'] - df['order_purchase_timestamp']).dt.days
    df['days_vs_estimated'] = (df['order_delivered_customer_date'] - df['order_estimated_delivery_date']).dt.days
    
    # Categorizar estado de entrega
    df['delivery_status'] = df.apply(lambda row: 
        'on_time' if pd.notna(row['days_vs_estimated']) and row['days_vs_estimated'] <= 0
        else 'late' if pd.notna(row['days_vs_estimated']) and row['days_vs_estimated'] > 0
        else 'unknown', axis=1)
    
    logger.info(f"✅ Órdenes transformadas: {len(df):,} registros")
    return df

def transform_products(products_df: pd.DataFrame, categories_df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma dataset de productos y añade categorías en inglés
    """
    logger.info("🔄 Transformando datos de productos...")
    
    df = products_df.copy()
    
    # Unir con traducciones de categorías
    df = df.merge(categories_df, on='product_category_name', how='left')
    
    # Limpiar y estandarizar categorías
    df['product_category_name'] = df['product_category_name'].fillna('unknown')
    df['product_category_name_english'] = df['product_category_name_english'].fillna('unknown')
    
    # Calcular volumen del producto
    df['product_volume_cm3'] = (df['product_length_cm'] * 
                               df['product_height_cm'] * 
                               df['product_width_cm'])
    
    # Categorizar tamaño por peso
    df['weight_category'] = pd.cut(df['product_weight_g'], 
                                  bins=[0, 500, 2000, 10000, float('inf')],
                                  labels=['light', 'medium', 'heavy', 'very_heavy'])
    
    logger.info(f"✅ Productos transformados: {len(df):,} registros")
    return df

def transform_order_items(order_items_df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma dataset de items de órdenes
    """
    logger.info("🔄 Transformando datos de items...")
    
    df = order_items_df.copy()
    
    # Calcular métricas de precio
    df['unit_price'] = df['price'] / df['order_item_id']  # Precio por unidad
    df['total_item_value'] = df['price'] + df['freight_value']  # Valor total con envío
    
    # Categorizar valor del envío
    df['freight_category'] = pd.cut(df['freight_value'], 
                                   bins=[0, 10, 30, 100, float('inf')],
                                   labels=['low', 'medium', 'high', 'very_high'])
    
    logger.info(f"✅ Items transformados: {len(df):,} registros")
    return df

def transform_payments(payments_df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma dataset de pagos
    """
    logger.info("🔄 Transformando datos de pagos...")
    
    df = payments_df.copy()
    
    # Categorizar valores de pago
    df['payment_range'] = pd.cut(df['payment_value'], 
                                bins=[0, 50, 100, 200, 500, float('inf')],
                                labels=['very_low', 'low', 'medium', 'high', 'very_high'])
    
    # Normalizar tipos de pago
    df['payment_type'] = df['payment_type'].str.lower().str.replace('_', ' ')
    
    logger.info(f"✅ Pagos transformados: {len(df):,} registros")
    return df

def transform_reviews(reviews_df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma dataset de reseñas
    """
    logger.info("🔄 Transformando datos de reseñas...")
    
    df = reviews_df.copy()
    
    # Convertir fechas
    date_columns = ['review_creation_date', 'review_answer_timestamp']
    df = transform_dates(df, date_columns)
    
    # Categorizar satisfacción
    df['satisfaction_level'] = df['review_score'].map({
        1: 'very_dissatisfied',
        2: 'dissatisfied', 
        3: 'neutral',
        4: 'satisfied',
        5: 'very_satisfied'
    })
    
    # Analizar longitud de comentarios
    df['comment_title_length'] = df['review_comment_title'].str.len().fillna(0)
    df['comment_message_length'] = df['review_comment_message'].str.len().fillna(0)
    df['has_comment'] = (df['comment_title_length'] + df['comment_message_length']) > 0
    
    logger.info(f"✅ Reseñas transformadas: {len(df):,} registros")
    return df

# Aplicar todas las transformaciones
def transform_all_data(raw_data: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Aplica todas las transformaciones a los datasets
    """
    logger.info("🔄 Iniciando transformación completa de datos...")
    
    transformed_data = {}
    
    # Transformar cada dataset
    transformed_data['customers'] = transform_customers(raw_data['customers'])
    transformed_data['orders'] = transform_orders(raw_data['orders'])
    transformed_data['products'] = transform_products(raw_data['products'], raw_data['product_categories'])
    transformed_data['order_items'] = transform_order_items(raw_data['order_items'])
    transformed_data['payments'] = transform_payments(raw_data['order_payments'])
    transformed_data['reviews'] = transform_reviews(raw_data['order_reviews'])
    
    # Datasets que no requieren transformación especial
    transformed_data['sellers'] = raw_data['sellers'].copy()
    transformed_data['geolocation'] = raw_data['geolocation'].copy()
    
    logger.info("✅ Transformación completa finalizada")
    return transformed_data

## 3. 📤 CARGA (Load) - Configuración MongoDB

Configuración de conexión y carga de datos a MongoDB

In [16]:
# Configuración de MongoDB
MONGODB_CONFIG = {
    'host': 'localhost',  # Cambiar por tu host de MongoDB
    'port': 27017,        # Puerto por defecto de MongoDB
    'database': 'olist_ecommerce',  # Nombre de la base de datos
    'username': None,     # Agregar si usas autenticación
    'password': None      # Agregar si usas autenticación
}

def get_mongodb_connection():
    """
    Establece conexión con MongoDB
    """
    try:
        # Construir URI de conexión
        if MONGODB_CONFIG['username'] and MONGODB_CONFIG['password']:
            connection_string = f"mongodb://{MONGODB_CONFIG['username']}:{MONGODB_CONFIG['password']}@{MONGODB_CONFIG['host']}:{MONGODB_CONFIG['port']}/{MONGODB_CONFIG['database']}"
        else:
            connection_string = f"mongodb://{MONGODB_CONFIG['host']}:{MONGODB_CONFIG['port']}/"
        
        # Conectar a MongoDB
        client = MongoClient(connection_string)
        
        # Verificar conexión
        client.admin.command('ping')
        
        # Obtener base de datos
        db = client[MONGODB_CONFIG['database']]
        
        logger.info(f"✅ Conexión exitosa a MongoDB: {MONGODB_CONFIG['database']}")
        return client, db
        
    except Exception as e:
        logger.error(f"❌ Error conectando a MongoDB: {str(e)}")
        return None, None

def create_indexes(db):
    """
    Crea índices para optimizar consultas
    """
    logger.info("🔄 Creando índices para optimizar consultas...")
    
    try:
        # Índices para órdenes
        db.orders.create_index("customer_id")
        db.orders.create_index("order_status")
        db.orders.create_index("order_purchase_timestamp")
        db.orders.create_index([("order_year", 1), ("order_month", 1)])
        
        # Índices para items
        db.order_items.create_index("order_id")
        db.order_items.create_index("product_id")
        db.order_items.create_index("seller_id")
        
        # Índices para pagos
        db.payments.create_index("order_id")
        db.payments.create_index("payment_type")
        
        # Índices para reseñas
        db.reviews.create_index("order_id")
        db.reviews.create_index("review_score")
        
        # Índices para clientes
        db.customers.create_index("customer_state")
        db.customers.create_index("customer_city")
        
        # Índices para productos
        db.products.create_index("product_category_name_english")
        
        logger.info("✅ Índices creados exitosamente")
        
    except Exception as e:
        logger.error(f"❌ Error creando índices: {str(e)}")

# Establecer conexión inicial
print("🔄 Configurando conexión a MongoDB...")
print(f"📍 Host: {MONGODB_CONFIG['host']}:{MONGODB_CONFIG['port']}")
print(f"🗄️ Base de datos: {MONGODB_CONFIG['database']}")
print("\n⚠️  NOTA: Asegúrate de que MongoDB esté ejecutándose antes de continuar")

🔄 Configurando conexión a MongoDB...
📍 Host: localhost:27017
🗄️ Base de datos: olist_ecommerce

⚠️  NOTA: Asegúrate de que MongoDB esté ejecutándose antes de continuar


In [None]:
def prepare_dataframe_for_mongodb(df: pd.DataFrame) -> List[Dict]:
    """
    Prepara un DataFrame para inserción en MongoDB
    """
    # Convertir DataFrame a diccionarios
    records = df.to_dict('records')
    
    # Convertir tipos de datos problemáticos
    for record in records:
        for key, value in record.items():
            # Convertir NaN y tipos numpy a tipos nativos de Python
            if pd.isna(value):
                record[key] = None
            elif isinstance(value, (np.integer, np.floating)):
                record[key] = value.item()
            elif isinstance(value, np.ndarray):
                record[key] = value.tolist()
            elif hasattr(value, 'isoformat'):  # datetime objects
                record[key] = value
    
    return records

def load_to_mongodb(db, collection_name: str, data: pd.DataFrame, batch_size: int = 1000) -> bool:
    """
    Carga datos a una colección de MongoDB en lotes
    """
    try:
        logger.info(f"🔄 Cargando {len(data):,} registros a colección '{collection_name}'...")
        
        # Preparar datos para MongoDB
        records = prepare_dataframe_for_mongodb(data)
        
        # Obtener colección
        collection = db[collection_name]
        
        # Limpiar colección existente (opcional)
        collection.drop()
        logger.info(f"🗑️ Colección '{collection_name}' limpiada")
        
        # Insertar en lotes
        total_inserted = 0
        for i in range(0, len(records), batch_size):
            batch = records[i:i + batch_size]
            result = collection.insert_many(batch)
            total_inserted += len(result.inserted_ids)
            
            if i % (batch_size * 10) == 0:  # Log cada 10 lotes
                logger.info(f"📤 Progreso: {total_inserted:,}/{len(records):,} registros")
        
        logger.info(f"✅ Carga completada: {total_inserted:,} registros en '{collection_name}'")
        return True
        
    except Exception as e:
        logger.error(f"❌ Error cargando datos a '{collection_name}': {str(e)}")
        return False

def load_all_data_to_mongodb(transformed_data: Dict[str, pd.DataFrame]) -> bool:
    """
    Carga todos los datasets transformados a MongoDB
    """
    logger.info("🚀 Iniciando carga completa a MongoDB...")
    
    # Establecer conexión
    client, db = get_mongodb_connection()
    
    if db is None:
        logger.error("❌ No se pudo establecer conexión con MongoDB")
        return False
    
    success_count = 0
    total_collections = len(transformed_data)
    
    # Mapeo de datasets a nombres de colecciones
    collection_mapping = {
        'customers': 'customers',
        'orders': 'orders', 
        'products': 'products',
        'order_items': 'order_items',
        'payments': 'payments',
        'reviews': 'reviews',
        'sellers': 'sellers',
        'geolocation': 'geolocation'
    }
    
    try:
        # Cargar cada dataset
        for dataset_name, df in transformed_data.items():
            if dataset_name in collection_mapping:
                collection_name = collection_mapping[dataset_name]
                
                if load_to_mongodb(db, collection_name, df):
                    success_count += 1
                else:
                    logger.error(f"❌ Falló la carga de {dataset_name}")
        
        # Crear índices después de cargar todos los datos
        if success_count == total_collections:
            create_indexes(db)
            
        # Generar estadísticas finales
        logger.info(f"📊 Resumen de carga:")
        for collection_name in collection_mapping.values():
            count = db[collection_name].count_documents({})
            logger.info(f"   📋 {collection_name}: {count:,} documentos")
        
        logger.info(f"✅ Proceso ETL completado: {success_count}/{total_collections} colecciones cargadas")
        
        return success_count == total_collections
        
    except Exception as e:
        logger.error(f"❌ Error en proceso de carga: {str(e)}")
        return False
        
    finally:
        # Cerrar conexión
        if client:
            client.close()
            logger.info("🔌 Conexión a MongoDB cerrada")

## 4. 🎯 EJECUCIÓN DEL PROCESO ETL COMPLETO

Ejecuta todo el pipeline ETL desde extracción hasta carga en MongoDB

In [18]:
def run_complete_etl_pipeline():
    """
    Ejecuta el pipeline ETL completo
    """
    start_time = datetime.now()
    logger.info("🚀 INICIANDO PROCESO ETL COMPLETO")
    logger.info("="*60)
    
    try:
        # PASO 1: EXTRACCIÓN
        logger.info("📥 FASE 1: EXTRACCIÓN DE DATOS")
        raw_data = extract_data()
        
        if not raw_data:
            logger.error("❌ Error en extracción de datos")
            return False
        
        # PASO 2: TRANSFORMACIÓN
        logger.info("\n🔧 FASE 2: TRANSFORMACIÓN DE DATOS")
        transformed_data = transform_all_data(raw_data)
        
        # PASO 3: CARGA
        logger.info("\n📤 FASE 3: CARGA A MONGODB")
        success = load_all_data_to_mongodb(transformed_data)
        
        # RESUMEN FINAL
        end_time = datetime.now()
        duration = end_time - start_time
        
        logger.info("\n" + "="*60)
        if success:
            logger.info("🎉 ¡PROCESO ETL COMPLETADO EXITOSAMENTE!")
        else:
            logger.error("❌ PROCESO ETL FALLÓ")
            
        logger.info(f"⏱️ Tiempo total: {duration}")
        logger.info(f"📅 Finalizado: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
        logger.info("="*60)
        
        return success
        
    except Exception as e:
        logger.error(f"❌ Error crítico en pipeline ETL: {str(e)}")
        return False

# EJECUTAR ETL COMPLETO
print("🔄 ¿Estás listo para ejecutar el proceso ETL completo?")
print("📋 El proceso incluye:")
print("   1. ✅ Extracción de datos CSV")
print("   2. ✅ Transformación y limpieza")
print("   3. ✅ Carga a MongoDB con índices")
print()
print("⚠️  IMPORTANTE:")
print("   - Asegúrate de que MongoDB esté ejecutándose")
print("   - Los datos existentes en MongoDB serán sobrescritos")
print("   - El proceso puede tomar varios minutos")
print()
print("📝 Para ejecutar, descomenta y ejecuta la siguiente línea:")
print("# etl_success = run_complete_etl_pipeline()")

🔄 ¿Estás listo para ejecutar el proceso ETL completo?
📋 El proceso incluye:
   1. ✅ Extracción de datos CSV
   2. ✅ Transformación y limpieza
   3. ✅ Carga a MongoDB con índices

⚠️  IMPORTANTE:
   - Asegúrate de que MongoDB esté ejecutándose
   - Los datos existentes en MongoDB serán sobrescritos
   - El proceso puede tomar varios minutos

📝 Para ejecutar, descomenta y ejecuta la siguiente línea:
# etl_success = run_complete_etl_pipeline()


## 5. ✅ VALIDACIÓN Y CONSULTAS DE PRUEBA

Funciones para validar que los datos se cargaron correctamente y realizar consultas de prueba

In [None]:
def validate_mongodb_data():
    """
    Valida que los datos se cargaron correctamente en MongoDB
    """
    logger.info("🔍 Validando datos en MongoDB...")
    
    client, db = get_mongodb_connection()
    
    if db is None:
        logger.error("❌ No se pudo conectar a MongoDB para validación")
        return False
    
    try:
        collections = ['customers', 'orders', 'products', 'order_items', 
                      'payments', 'reviews', 'sellers', 'geolocation']
        
        print("📊 ESTADÍSTICAS DE DATOS EN MONGODB")
        print("="*50)
        
        total_documents = 0
        
        for collection_name in collections:
            collection = db[collection_name]
            count = collection.count_documents({})
            total_documents += count
            
            # Obtener un documento de ejemplo
            sample_doc = collection.find_one()
            fields_count = len(sample_doc.keys()) if sample_doc else 0
            
            print(f"📋 {collection_name}:")
            print(f"   📄 Documentos: {count:,}")
            print(f"   🏷️ Campos: {fields_count}")
            print()
        
        print(f"📊 TOTAL DE DOCUMENTOS: {total_documents:,}")
        
        # Validaciones específicas
        print("\\n🔍 VALIDACIONES ESPECÍFICAS:")
        
        # 1. Verificar que existen órdenes con estado 'delivered'
        delivered_orders = db.orders.count_documents({'order_status': 'delivered'})
        print(f"✅ Órdenes entregadas: {delivered_orders:,}")
        
        # 2. Verificar rangos de fechas
        date_range = db.orders.aggregate([
            {'$group': {
                '_id': None,
                'min_date': {'$min': '$order_purchase_timestamp'},
                'max_date': {'$max': '$order_purchase_timestamp'}
            }}
        ])
        
        for doc in date_range:
            print(f"📅 Rango de fechas: {doc['min_date'].strftime('%Y-%m-%d')} a {doc['max_date'].strftime('%Y-%m-%d')}")
        
        # 3. Verificar integridad referencial básica
        order_count = db.orders.count_documents({})
        payment_count = db.payments.count_documents({})
        print(f"🔗 Órdenes: {order_count:,}, Pagos: {payment_count:,}")
        
        logger.info("✅ Validación completada exitosamente")
        return True
        
    except Exception as e:
        logger.error(f"❌ Error en validación: {str(e)}")
        return False
    
    finally:
        if client:
            client.close()

def run_sample_queries():
    """
    Ejecuta consultas de ejemplo para demostrar funcionalidad
    """
    logger.info("🔍 Ejecutando consultas de ejemplo...")
    
    client, db = get_mongodb_connection()
    
    if db is None:
        logger.error("❌ No se pudo conectar a MongoDB")
        return
    
    try:
        print("🔍 CONSULTAS DE EJEMPLO EN MONGODB")
        print("="*50)
        
        # 1. Top 5 estados con más clientes
        print("1️⃣ Top 5 Estados con Más Clientes:")
        top_states = db.customers.aggregate([
            {'$group': {'_id': '$customer_state', 'count': {'$sum': 1}}},
            {'$sort': {'count': -1}},
            {'$limit': 5}
        ])
        
        for state in top_states:
            print(f"   {state['_id']}: {state['count']:,} clientes")
        
        # 2. Promedio de satisfacción por mes
        print("\\n2️⃣ Promedio de Satisfacción por Mes (últimos 6 meses):")
        monthly_satisfaction = db.reviews.aggregate([
            {'$match': {'review_score': {'$exists': True}}},
            {'$group': {
                '_id': {
                    'year': {'$year': '$review_creation_date'},
                    'month': {'$month': '$review_creation_date'}
                },
                'avg_score': {'$avg': '$review_score'},
                'count': {'$sum': 1}
            }},
            {'$sort': {'_id.year': -1, '_id.month': -1}},
            {'$limit': 6}
        ])
        
        for month in monthly_satisfaction:
            year = month['_id']['year']
            month_num = month['_id']['month']
            avg_score = month['avg_score']
            count = month['count']
            print(f"   {year}-{month_num:02d}: {avg_score:.2f} ⭐ ({count} reviews)")
        
        # 3. Top 5 categorías por revenue
        print("\\n3️⃣ Top 5 Categorías por Revenue:")
        category_revenue = db.order_items.aggregate([
            {'$lookup': {
                'from': 'products',
                'localField': 'product_id',
                'foreignField': 'product_id',
                'as': 'product_info'
            }},
            {'$unwind': '$product_info'},
            {'$group': {
                '_id': '$product_info.product_category_name_english',
                'total_revenue': {'$sum': '$price'},
                'total_orders': {'$sum': 1}
            }},
            {'$sort': {'total_revenue': -1}},
            {'$limit': 5}
        ])
        
        for category in category_revenue:
            name = category['_id'] or 'Unknown'
            revenue = category['total_revenue']
            orders = category['total_orders']
            print(f"   {name}: R$ {revenue:,.2f} ({orders:,} items)")
        
        # 4. Distribución de métodos de pago
        print("\\n4️⃣ Distribución de Métodos de Pago:")
        payment_distribution = db.payments.aggregate([
            {'$group': {
                '_id': '$payment_type',
                'count': {'$sum': 1},
                'total_value': {'$sum': '$payment_value'}
            }},
            {'$sort': {'count': -1}}
        ])
        
        for payment in payment_distribution:
            method = payment['_id']
            count = payment['count']
            value = payment['total_value']
            print(f"   {method}: {count:,} transacciones (R$ {value:,.2f})")
        
        logger.info("✅ Consultas de ejemplo completadas")
        
    except Exception as e:
        logger.error(f"❌ Error ejecutando consultas: {str(e)}")
    
    finally:
        if client:
            client.close()

# Funciones de utilidad para consultas
print("🛠️ FUNCIONES DE VALIDACIÓN Y CONSULTAS LISTAS")
print("📝 Para usar:")
print("   - validate_mongodb_data() : Valida los datos cargados")
print("   - run_sample_queries() : Ejecuta consultas de ejemplo")

🛠️ FUNCIONES DE VALIDACIÓN Y CONSULTAS LISTAS
📝 Para usar:
   - validate_mongodb_data() : Valida los datos cargados
   - run_sample_queries() : Ejecuta consultas de ejemplo


In [20]:
# Ejecutar transformaciones
print("🔄 Ejecutando transformaciones de datos...")
transformed_data = transform_all_data(raw_data)
print("✅ Transformaciones completadas")

2025-08-04 15:09:20,165 - INFO - 🔄 Iniciando transformación completa de datos...
2025-08-04 15:09:20,167 - INFO - 🔄 Transformando datos de clientes...
2025-08-04 15:09:20,167 - INFO - 🔄 Transformando datos de clientes...


🔄 Ejecutando transformaciones de datos...


2025-08-04 15:09:20,461 - INFO - ✅ Clientes transformados: 99,441 registros
2025-08-04 15:09:20,462 - INFO - 🔄 Transformando datos de órdenes...
2025-08-04 15:09:20,462 - INFO - 🔄 Transformando datos de órdenes...
2025-08-04 15:09:24,537 - INFO - ✅ Órdenes transformadas: 99,441 registros
2025-08-04 15:09:24,538 - INFO - 🔄 Transformando datos de productos...
2025-08-04 15:09:24,537 - INFO - ✅ Órdenes transformadas: 99,441 registros
2025-08-04 15:09:24,538 - INFO - 🔄 Transformando datos de productos...
2025-08-04 15:09:24,573 - INFO - ✅ Productos transformados: 32,951 registros
2025-08-04 15:09:24,575 - INFO - 🔄 Transformando datos de items...
2025-08-04 15:09:24,573 - INFO - ✅ Productos transformados: 32,951 registros
2025-08-04 15:09:24,575 - INFO - 🔄 Transformando datos de items...
2025-08-04 15:09:24,599 - INFO - ✅ Items transformados: 112,650 registros
2025-08-04 15:09:24,601 - INFO - 🔄 Transformando datos de pagos...
2025-08-04 15:09:24,599 - INFO - ✅ Items transformados: 112,650 r

✅ Transformaciones completadas


## 📋 RESUMEN DEL PROCESO ETL CREADO

### ✅ **Pipeline ETL Completo Configurado**

Tu proceso ETL está listo para ejecutar y incluye:

#### 🔧 **Funcionalidades Implementadas:**

**1. 📥 EXTRACCIÓN:**
- ✅ Carga automática de 9 datasets CSV
- ✅ Manejo de errores y logging
- ✅ Validación de archivos

**2. 🔧 TRANSFORMACIÓN:**
- ✅ Limpieza y estandarización de datos
- ✅ Conversión de tipos de datos
- ✅ Creación de campos derivados
- ✅ Categorización de variables
- ✅ Cálculo de métricas de negocio

**3. 📤 CARGA A MONGODB:**
- ✅ Conexión automática a MongoDB
- ✅ Creación de colecciones optimizadas
- ✅ Índices para consultas rápidas
- ✅ Carga en lotes para eficiencia
- ✅ Manejo de tipos de datos MongoDB

**4. ✅ VALIDACIÓN:**
- ✅ Verificación de integridad de datos
- ✅ Consultas de ejemplo
- ✅ Estadísticas de carga

#### 🚀 **Para Ejecutar el ETL:**

```python
# 1. Asegúrate de que MongoDB esté ejecutándose
# 2. Ejecuta el pipeline completo:
etl_success = run_complete_etl_pipeline()

# 3. Valida los resultados:
validate_mongodb_data()

# 4. Ejecuta consultas de ejemplo:
run_sample_queries()
```

#### 📊 **Colecciones en MongoDB:**
- `customers` - Datos de clientes con ubicación
- `orders` - Órdenes con métricas temporales  
- `products` - Productos con categorías en inglés
- `order_items` - Items con métricas de precio
- `payments` - Pagos categorizados
- `reviews` - Reseñas con análisis de satisfacción
- `sellers` - Información de vendedores
- `geolocation` - Datos de geolocalización

#### 🔍 **Transformaciones Aplicadas:**
- **Fechas**: Conversión a datetime y campos derivados
- **Categorías**: Traducción al inglés y limpieza
- **Métricas**: Cálculo de tiempos de entrega, satisfacción, etc.
- **Geolocalización**: Estandarización de ciudades y estados
- **Precios**: Categorización y métricas de valor

¡Tu proceso ETL está listo para usar con MongoDB! 🎉

In [28]:
# 🚀 ETL OPTIMIZADO PARA GRANDES VOLÚMENES
print("🚀 EJECUTANDO ETL OPTIMIZADO")
print("="*40)

def load_optimized_etl():
    """
    Versión optimizada del ETL con mejor manejo de memoria
    """
    try:
        # Conexión directa
        client = MongoClient('mongodb://localhost:27017/')
        db = client['olist_ecommerce']
        
        print("✅ Conexión establecida")
        
        # Mapeo de colecciones con tamaños de lote optimizados
        collections_config = {
            'customers': ('customers', 500),
            'sellers': ('sellers', 1000),
            'products': ('products', 500),
            'orders': ('orders', 300),
            'order_items': ('order_items', 200),
            'payments': ('payments', 300),
            'reviews': ('reviews', 300),
            'geolocation': ('geolocation', 100)  # La más grande, lotes pequeños
        }
        
        total_loaded = 0
        
        for dataset_name, (collection_name, batch_size) in collections_config.items():
            if dataset_name in transformed_data:
                print(f"\\n📤 Cargando {dataset_name}...")
                
                df = transformed_data[dataset_name]
                records = df.to_dict('records')
                
                # Limpiar datos problemáticos
                for record in records:
                    for key, value in record.items():
                        if pd.isna(value):
                            record[key] = None
                        elif isinstance(value, (np.integer, np.floating)):
                            record[key] = value.item()
                
                # Limpiar colección
                db[collection_name].drop()
                
                # Insertar en lotes pequeños
                for i in range(0, len(records), batch_size):
                    batch = records[i:i + batch_size]
                    db[collection_name].insert_many(batch)
                    
                    if i % (batch_size * 5) == 0:
                        print(f"   📊 Progreso: {i + len(batch):,}/{len(records):,}")
                
                count = db[collection_name].count_documents({})
                total_loaded += count
                print(f"   ✅ {collection_name}: {count:,} documentos")
        
        print(f"\\n🎉 ETL COMPLETADO EXITOSAMENTE!")
        print(f"📊 Total de documentos cargados: {total_loaded:,}")
        
        # Crear índices
        print("\\n🔧 Creando índices...")
        db.orders.create_index("customer_id")
        db.order_items.create_index("order_id") 
        db.payments.create_index("order_id")
        db.reviews.create_index("order_id")
        print("✅ Índices creados")
        
        client.close()
        return True
        
    except Exception as e:
        print(f"❌ Error: {str(e)}")
        return False

# Ejecutar ETL optimizado
success = load_optimized_etl()

🚀 EJECUTANDO ETL OPTIMIZADO
✅ Conexión establecida
\n📤 Cargando customers...
   📊 Progreso: 500/99,441
   📊 Progreso: 3,000/99,441
   📊 Progreso: 5,500/99,441
   📊 Progreso: 8,000/99,441
   📊 Progreso: 10,500/99,441
   📊 Progreso: 13,000/99,441
   📊 Progreso: 15,500/99,441
   📊 Progreso: 18,000/99,441
   📊 Progreso: 20,500/99,441
   📊 Progreso: 23,000/99,441
   📊 Progreso: 25,500/99,441
   📊 Progreso: 28,000/99,441
   📊 Progreso: 30,500/99,441
   📊 Progreso: 33,000/99,441
   📊 Progreso: 35,500/99,441
   📊 Progreso: 38,000/99,441
   📊 Progreso: 40,500/99,441
   📊 Progreso: 43,000/99,441
   📊 Progreso: 45,500/99,441
   📊 Progreso: 48,000/99,441
   📊 Progreso: 50,500/99,441
   📊 Progreso: 53,000/99,441
   📊 Progreso: 55,500/99,441
   📊 Progreso: 58,000/99,441
   📊 Progreso: 60,500/99,441
   📊 Progreso: 63,000/99,441
   📊 Progreso: 65,500/99,441
   📊 Progreso: 68,000/99,441
   📊 Progreso: 70,500/99,441
   📊 Progreso: 73,000/99,441
   📊 Progreso: 75,500/99,441
   📊 Progreso: 78,000/99,441
 

In [29]:
# 🎉 VALIDACIÓN FINAL DEL ETL
print("🎉 VALIDACIÓN FINAL DEL PROCESO ETL")
print("="*50)

try:
    client = MongoClient('mongodb://localhost:27017/')
    db = client['olist_ecommerce']
    
    # Verificar todas las colecciones
    collections = ['customers', 'sellers', 'products', 'orders', 
                  'order_items', 'payments', 'reviews', 'geolocation']
    
    total_docs = 0
    print("📊 RESUMEN DE COLECCIONES CARGADAS:")
    print("-" * 50)
    
    for collection_name in collections:
        count = db[collection_name].count_documents({})
        total_docs += count
        print(f"📋 {collection_name:15}: {count:,} documentos")
    
    print("-" * 50)
    print(f"📊 TOTAL DOCUMENTOS: {total_docs:,}")
    
    # Consultas de validación
    print("\\n🔍 CONSULTAS DE VALIDACIÓN:")
    print("-" * 30)
    
    # 1. Top 3 estados con más clientes
    print("1️⃣ Top 3 Estados con Más Clientes:")
    top_states = list(db.customers.aggregate([
        {'$group': {'_id': '$customer_state', 'count': {'$sum': 1}}},
        {'$sort': {'count': -1}},
        {'$limit': 3}
    ]))
    
    for state in top_states:
        print(f"   {state['_id']}: {state['count']:,} clientes")
    
    # 2. Total revenue
    print("\\n2️⃣ Revenue Total:")
    total_revenue = list(db.payments.aggregate([
        {'$group': {'_id': None, 'total': {'$sum': '$payment_value'}}}
    ]))
    
    if total_revenue:
        print(f"   💰 R$ {total_revenue[0]['total']:,.2f}")
    
    # 3. Promedio de satisfacción
    print("\\n3️⃣ Promedio de Satisfacción:")
    avg_satisfaction = list(db.reviews.aggregate([
        {'$group': {'_id': None, 'avg_score': {'$avg': '$review_score'}}}
    ]))
    
    if avg_satisfaction:
        print(f"   ⭐ {avg_satisfaction[0]['avg_score']:.2f}/5.0")
    
    print("\\n🎉 ¡ETL COMPLETADO Y VALIDADO EXITOSAMENTE!")
    print("✅ Todos los datos están disponibles en MongoDB")
    print("🗄️ Base de datos: 'olist_ecommerce'")
    print("🔗 Puedes conectarte con cualquier cliente MongoDB")
    
    client.close()
    
except Exception as e:
    print(f"❌ Error en validación: {str(e)}")

print("\\n" + "="*50)
print("🎯 PROCESO ETL FINALIZADO")
print("="*50)

🎉 VALIDACIÓN FINAL DEL PROCESO ETL
📊 RESUMEN DE COLECCIONES CARGADAS:
--------------------------------------------------
📋 customers      : 99,441 documentos
📋 sellers        : 3,095 documentos
📋 products       : 32,951 documentos
📋 orders         : 99,441 documentos
📋 order_items    : 112,650 documentos
📋 payments       : 103,886 documentos
📋 reviews        : 99,224 documentos
📋 geolocation    : 1,000,163 documentos
--------------------------------------------------
📊 TOTAL DOCUMENTOS: 1,550,851
\n🔍 CONSULTAS DE VALIDACIÓN:
------------------------------
1️⃣ Top 3 Estados con Más Clientes:
   SP: 41,746 clientes
   RJ: 12,852 clientes
   MG: 11,635 clientes
\n2️⃣ Revenue Total:
   💰 R$ 16,008,872.12
\n3️⃣ Promedio de Satisfacción:
   ⭐ 4.09/5.0
\n🎉 ¡ETL COMPLETADO Y VALIDADO EXITOSAMENTE!
✅ Todos los datos están disponibles en MongoDB
🗄️ Base de datos: 'olist_ecommerce'
🔗 Puedes conectarte con cualquier cliente MongoDB
🎯 PROCESO ETL FINALIZADO
