 ============================================================================
# EJEMPLO: Pandas vs Spark - Limitaciones y Escalabilidad
 ============================================================================
### Pandas vs Spark: Cuando Pandas se queda corto

Este notebook demuestra:
1. Las limitaciones de memoria de Pandas
2. Cómo Spark escala el mismo problema
3. Diferencias en procesamiento y performance

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.types import *
import time

## Using Spark to process the data

In [0]:
print("=" * 80)
print("GENERANDO DATASET DE EJEMPLO: 100 millones de registros de ventas")
print("=" * 80)

# Definir esquema para Spark
schema = StructType([
    StructField("transaction_id", LongType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("category", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("region", StringType(), False)
])


In [0]:
num_records = 100_000_000  # 100 millones de registros
categories = ["Electronics", "Clothing", "Food", "Books", "Sports"]
regions = ["North", "South", "East", "West", "Central"]

print(f"\nCreando {num_records:,} registros con Spark...")
start_time = time.time()

In [0]:
df_spark = (spark.range(num_records)
    .withColumn("transaction_id", F.col("id"))
    .withColumn("customer_id", (F.rand() * 1_000_000).cast("int"))
    .withColumn("product_id", (F.rand() * 10_000).cast("int"))
    .withColumn("category", F.array(*[F.lit(c) for c in categories])[
        (F.rand() * len(categories)).cast("int")])
    .withColumn("amount", F.round(F.rand() * 1000 + 10, 2))
    .withColumn("quantity", (F.rand() * 10 + 1).cast("int"))
    .withColumn("timestamp", F.expr("date_sub(current_timestamp(), cast(rand() * 365 as int))"))
    .withColumn("region", F.array(*[F.lit(r) for r in regions])[
        (F.rand() * len(regions)).cast("int")])
    .drop("id")
)

In [0]:
# df_spark.cache()
count_spark = df_spark.count()

elapsed = time.time() - start_time
print(f"✓ Dataset creado: {count_spark:,} registros en {elapsed:.2f} segundos")


## Using pandas to process the data

In [0]:
print("\n" + "=" * 80)
print("CASO 1: INTENTANDO CARGAR TODO EN PANDAS")
print("=" * 80)

try:
    # Intentar cargar una porción grande en Pandas
    sample_size = 10_000_000  # 10 millones (solo el 10% del dataset)
    
    print(f"\nIntentando cargar {sample_size:,} registros en Pandas...")
    print("⚠️  ADVERTENCIA: Esto consumirá mucha memoria RAM")
    
    start_time = time.time()
    
    # Convertir muestra a Pandas
    df_pandas = df_spark.limit(sample_size).toPandas()
    
    elapsed = time.time() - start_time
    # deep true nos da el uso real de memoria al considerar objetos object
    memory_mb = df_pandas.memory_usage(deep=True).sum() / 1024 / 1024
    
    print(f"✓ Cargado en {elapsed:.2f} segundos")
    print(f"📊 Memoria utilizada: {memory_mb:,.2f} MB")
    print(f"⚠️  Esto es solo el {(sample_size/num_records)*100:.1f}% del dataset total")
    print(f"⚠️  Cargar el 100% requeriría ~{(memory_mb * num_records / sample_size):,.0f} MB")
    print("\n💡 PROBLEMA DE PANDAS:")
    print("   - Todo debe caber en la memoria RAM de un solo nodo")
    print("   - No hay paralelización distribuida")
    print("   - Escalar requiere hardware más grande (scale-up)")
    
except MemoryError as e:
    print("❌ ERROR: MemoryError - No hay suficiente RAM!")
    print(f"   {str(e)}")
except Exception as e:
    print(f"❌ ERROR: {str(e)}")


 ============================================================================
## OPERACIONES COMPLEJAS
 ============================================================================


In [0]:
print("\n" + "=" * 80)
print("CASO 2: AGREGACIONES COMPLEJAS")
print("=" * 80)

# Operación: Análisis de ventas por región, categoría y mes
print("\n📊 Análisis: Ventas totales y promedio por región, categoría y mes")


In [0]:
# --- Con Pandas (en la muestra) ---
if 'df_pandas' in locals():
    print("\n[PANDAS] Procesando 10M registros...")
    start_time = time.time()
    
    df_pandas['year_month'] = pd.to_datetime(df_pandas['timestamp']).dt.to_period('M')
    
    result_pandas = (df_pandas
        .groupby(['region', 'category', 'year_month'])
        .agg({
            'amount': ['sum', 'mean', 'count'],
            'quantity': 'sum'
        })
        .reset_index()
    )
    
    elapsed_pandas = time.time() - start_time
    print(f"✓ Tiempo: {elapsed_pandas:.2f} segundos")
    print(f"✓ Registros resultantes: {len(result_pandas):,}")

In [0]:

# --- Con Spark (dataset completo) ---
print("\n[SPARK] Procesando 100M registros (10x más datos)...")
start_time = time.time()

result_spark = (df_spark
    .withColumn("year_month", F.date_format("timestamp", "yyyy-MM"))
    .groupBy("region", "category", "year_month")
    .agg(
        F.sum("amount").alias("total_amount"),
        F.avg("amount").alias("avg_amount"),
        F.count("*").alias("count_transactions"),
        F.sum("quantity").alias("total_quantity")
    )
    .orderBy("region", "category", "year_month")
)

In [0]:
result_count = result_spark.count()
elapsed_spark = time.time() - start_time

print(f"✓ Tiempo: {elapsed_spark:.2f} segundos")
print(f"✓ Registros resultantes: {result_count:,}")

In [0]:
# locals() es una función de python que nos permite inspeccionar las variables locales creadas
# hasta el momento, globals() nos da acceso a las variables globales

print("\n📈 COMPARACIÓN:")
if 'elapsed_pandas' in locals():
    print(f"   Pandas:  {elapsed_pandas:.2f}s para 10M registros")
    print(f"   Spark:   {elapsed_spark:.2f}s para 100M registros (10x más datos)")
    speedup = (elapsed_pandas * 10) / elapsed_spark
    print(f"   🚀 Spark es ~{speedup:.1f}x más rápido en datos proporcionales")

# Mostrar muestra de resultados
print("\n📋 Muestra de resultados:")
result_spark.show(10, truncate=False)

====================================================
## Comparando joins
====================================================

In [0]:
print("\n" + "=" * 80)
print("JOIN DE GRANDES DATASETS")
print("=" * 80)

print("\nCreando tabla de clientes (5M registros)...")
df_customers = (spark.range(5_000_000)
    .withColumn("customer_id", F.col("id").cast("int"))
    .withColumn("customer_name", F.concat(F.lit("Customer_"), F.col("id")))
    .withColumn("customer_segment", 
        F.when(F.rand() < 0.3, "Premium")
        .when(F.rand() < 0.6, "Standard")
        .otherwise("Basic"))
    .withColumn("registration_date", 
        F.expr("date_sub(current_timestamp(), cast(rand() * 1000 as int))"))
    .drop("id")
)

In [0]:
print("\n[SPARK] Join de 100M transacciones con 5M clientes...")
start_time = time.time()

df_enriched = (df_spark
    .join(df_customers, "customer_id", "left")
    .select(
        "transaction_id",
        "customer_id",
        "customer_name",
        "customer_segment",
        "category",
        "amount",
        "region"
    )
)

In [0]:
# Calcular métricas por segmento de cliente
segment_analysis = (df_enriched
    .groupBy("customer_segment", "category")
    .agg(
        F.sum("amount").alias("total_sales"),
        F.avg("amount").alias("avg_transaction"),
        F.countDistinct("customer_id").alias("unique_customers")
    )
    .orderBy(F.desc("total_sales"))
)

result_count = segment_analysis.count()
elapsed = time.time() - start_time

print(f"✓ Join completado en {elapsed:.2f} segundos")
print(f"✓ Registros analizados: {result_count:,}")

print("\n📋 Análisis por segmento de cliente:")
segment_analysis.show(15, truncate=False)

In [0]:
print("\n💡 POR QUÉ SPARK ESCALA:")
print("   ✓ Procesamiento distribuido en múltiples nodos")
print("   ✓ Lazy evaluation: optimiza el plan de ejecución")
print("   ✓ Datos particionados: trabaja con chunks en paralelo")
print("   ✓ Shuffle optimization para joins eficientes")
print("   ✓ Spill to disk: usa disco si se queda sin RAM")

In [0]:
# --- INTENTO CON PANDAS (muestra pequeña) ---
print("\n" + "-" * 80)
print("INTENTO CON PANDAS (muestra reducida)")
print("-" * 80)

# Usamos muestras mucho más pequeñas para Pandas
pandas_sample_transactions = 500_000  # Solo 500K transacciones
pandas_sample_customers = 100_000     # Solo 100K clientes

print(f"\n⚠️  Usando muestra reducida para Pandas:")
print(f"   - Transacciones: {pandas_sample_transactions:,} (0.5% del total)")
print(f"   - Clientes: {pandas_sample_customers:,} (2% del total)")
print(f"   - Esto es necesario por limitaciones de memoria de Pandas")

try:
    print("\n[PANDAS] Convirtiendo datos a Pandas...")
    start_time = time.time()
    
    # Convertir muestras a Pandas
    df_transactions_pandas = df_spark.limit(pandas_sample_transactions).toPandas()
    df_customers_pandas = df_customers.limit(pandas_sample_customers).toPandas()
    
    conversion_time = time.time() - start_time
    print(f"✓ Conversión completada en {conversion_time:.2f} segundos")
    
    # Calcular memoria usada
    memory_transactions = df_transactions_pandas.memory_usage(deep=True).sum() / 1024 / 1024
    memory_customers = df_customers_pandas.memory_usage(deep=True).sum() / 1024 / 1024
    total_memory = memory_transactions + memory_customers
    
    print(f"📊 Memoria utilizada:")
    print(f"   - Transacciones: {memory_transactions:,.2f} MB")
    print(f"   - Clientes: {memory_customers:,.2f} MB")
    print(f"   - Total: {total_memory:,.2f} MB")
    
    # Proyección de memoria para dataset completo
    projected_memory = (memory_transactions * (num_records / pandas_sample_transactions)) + \
                       (memory_customers * (5_000_000 / pandas_sample_customers))
    print(f"⚠️  Memoria proyectada para dataset completo: ~{projected_memory:,.0f} MB ({projected_memory/1024:.1f} GB)")
    
    print("\n[PANDAS] Ejecutando JOIN...")
    start_time = time.time()
    
    # Realizar el join en Pandas
    df_enriched_pandas = df_transactions_pandas.merge(
        df_customers_pandas,
        on='customer_id',
        how='left'
    )
    
    # Calcular métricas por segmento
    segment_analysis_pandas = (df_enriched_pandas
        .groupby(['customer_segment', 'category'])
        .agg({
            'amount': ['sum', 'mean'],
            'customer_id': 'nunique'
        })
        .reset_index()
    )
    
    # Renombrar columnas para consistencia
    segment_analysis_pandas.columns = [
        'customer_segment', 'category', 
        'total_sales', 'avg_transaction', 'unique_customers'
    ]
    
    # Ordenar por ventas totales
    segment_analysis_pandas = segment_analysis_pandas.sort_values(
        'total_sales', ascending=False
    )
    
    elapsed_pandas = time.time() - start_time
    
    print(f"✓ JOIN completado en {elapsed_pandas:.2f} segundos")
    print(f"✓ Registros después del join: {len(df_enriched_pandas):,}")
    print(f"✓ Grupos analizados: {len(segment_analysis_pandas):,}")
    
    print("\n📋 Primeros resultados (Pandas):")
    print(segment_analysis_pandas.head(10).to_string(index=False))
    
    # Proyectar tiempo para dataset completo
    projected_time = elapsed_pandas * (num_records / pandas_sample_transactions)
    print(f"\n⏱️  Tiempo proyectado para dataset completo: ~{projected_time:.0f} segundos ({projected_time/60:.1f} minutos)")
    
except MemoryError as e:
    print("❌ ERROR: MemoryError durante el JOIN!")
    print(f"   Pandas no puede manejar este volumen de datos")
    print(f"   Incluso la muestra reducida es demasiado grande")
    elapsed_pandas = None
except Exception as e:
    print(f"❌ ERROR: {str(e)}")
    elapsed_pandas = None


RESUMEN: CUÁNDO USAR CADA HERRAMIENTA

┌─────────────────┬──────────────────────────┬──────────────────────────┐
│   CRITERIO      │         PANDAS           │          SPARK           │
├─────────────────┼──────────────────────────┼──────────────────────────┤
│ Tamaño datos    │ < 10 GB                  │ > 10 GB a Petabytes      │
│ Memoria         │ Todo debe caber en RAM   │ Procesa por particiones  │
│ Escalabilidad   │ Vertical (scale-up)      │ Horizontal (scale-out)   │
│ Velocidad       │ Rápido en datos pequeños │ Optimizado para Big Data │
│ Facilidad uso   │ API simple e intuitiva   │ Curva de aprendizaje     │
│ Iteración       │ Interactivo, inmediato   │ Lazy evaluation          │
│ Mejor para      │ Análisis exploratorio    │ ETL y producción         │
│                 │ Prototipado rápido       │ Pipelines distribuidos   │
└─────────────────┴──────────────────────────┴──────────────────────────┘

🎯 LECCIONES CLAVE:

1. LÍMITES DE MEMORIA
   - Pandas: Si tus datos no caben en RAM, estás en problemas
   - Spark: Divide y conquista con procesamiento distribuido

2. ESCALABILIDAD
   - Pandas: Agregar RAM es caro y tiene límites físicos
   - Spark: Agregar nodos es lineal y prácticamente ilimitado

3. PERFORMANCE
   - Pandas: Excelente para < 1GB, single-threaded principalmente
   - Spark: Diseñado para TB/PB con paralelización automática

4. CUÁNDO USAR QUÉ
   - Usa Pandas: Datasets < 10GB, análisis rápido, notebooks
   - Usa Spark: Datasets > 10GB, ETL, producción, múltiples fuentes

5. ESTRATEGIA HÍBRIDA
   - Procesa con Spark, analiza muestras con Pandas
   - Usa Spark SQL para filtrar, luego .toPandas() en resultados

✅ Notebook completado. Compara los tiempos y memoria utilizados.
💡 Experimenta cambiando num_records para ver el impacto en Pandas vs Spark\n