In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import time

# =====================
# Crear DataFrames simulados
# =====================

# Tabla grande (~10000000 registros)
df_grande = spark.range(0, 10000000).repartition(2).withColumnRenamed("id", "cliente_id")

# Tabla pequeña (~100 registros)
data_pequena = [(i, f"Segmento-{i%10}") for i in range(10)]
df_pequena = spark.createDataFrame(data_pequena, ["cliente_id", "segmento"]).coalesce(1)

# =====================
# 1. Join normal (sin optimización)
# =====================
print("🔁 Join normal con PySpark:")
t1 = time.time()
df_join_normal = df_grande.join(df_pequena, "cliente_id")
df_join_normal.count()
t2 = time.time()
print(f"⏱ Tiempo sin broadcast (PySpark): {t2 - t1:.2f} segundos")

# =====================
# 2. Join con broadcast (PySpark)
# =====================
print("🚀 Join con broadcast (PySpark):")
t3 = time.time()
df_join_broadcast = df_grande.join(broadcast(df_pequena), "cliente_id")
df_join_broadcast.count()
t4 = time.time()
print(f"⏱ Tiempo con broadcast (PySpark): {t4 - t3:.2f} segundos")

# =====================
# 3. Join normal con SQL
# =====================

# Registrar en SQL
df_grande.createOrReplaceTempView("clientes")
df_pequena.createOrReplaceTempView("segmentos")

print("🧠 Join normal con SQL:")
t5 = time.time()
df_sql_normal = spark.sql("""
    SELECT *
    FROM clientes c
    JOIN segmentos s
    ON c.cliente_id = s.cliente_id
""")
df_sql_normal.count()
t6 = time.time()
print(f"⏱ Tiempo sin broadcast (SQL): {t6 - t5:.2f} segundos")

# =====================
# 4. Join con broadcast en SQL (usando hint)
# =====================
print("⚡ Join con broadcast en SQL:")
t7 = time.time()
df_sql_broadcast = spark.sql("""
    SELECT /*+ BROADCAST(s) */
           *
    FROM clientes c
    JOIN segmentos s
    ON c.cliente_id = s.cliente_id
""")
df_sql_broadcast.count()
t8 = time.time()
print(f"⏱ Tiempo con broadcast (SQL): {t8 - t7:.2f} segundos")

# =====================
# Comparación final
# =====================
print("\n📊 Comparativa de tiempos:")
print(f"- Join normal (PySpark):     {t2 - t1:.2f} s")
print(f"- Join con broadcast (PySpark): {t4 - t3:.2f} s")
print(f"- Join normal (SQL):         {t6 - t5:.2f} s")
print(f"- Join con broadcast (SQL):  {t8 - t7:.2f} s")

# 🚀 Ejercicio – Optimización de Joins con Broadcast y Particionamiento

## 🎯 Objetivo
Comparar el rendimiento de un `join` tradicional versus un `broadcast join` en PySpark, utilizando un dataframe grande distribuido entre workers y un dataframe pequeño replicado. Reflexionar sobre los beneficios de usar broadcast en casos adecuados.

---

## 📁 Dataset

Vas a trabajar con dos datasets simulados:

- `df_grande`: un DataFrame grande (100.000 registros) con una sola columna `cliente_id`.
- `df_pequeno`: un DataFrame pequeño (100 registros) con columnas `cliente_id` y `segmento`.

Ambos comparten la clave `cliente_id`.

---

## 🛠 Actividades

### 1️⃣ Crear ambos DataFrames

```python
from pyspark.sql.functions import broadcast

# Dataset grande (distribuido en 2 particiones)
df_grande = spark.range(0, 100000).repartition(2).withColumnRenamed("id", "cliente_id")

# Dataset pequeño (simula catálogo de segmentos)
data_pequeno = [(i, f"Segmento-{i%10}") for i in range(100)]
df_pequeno = spark.createDataFrame(data_pequeno, ["cliente_id", "segmento"])
```

### 2️⃣ Realizar un join normal (sin broadcast)

### 3️⃣ Realizar un broadcast join

### 4️⃣ Visualizar el plan de ejecución (opcional)

```python
df_join_broadcast.explain(extended=True)
```
