# Sesion 2 - Técnicas de Optimización en Spark y Databricks

## Bloque 1 – Buenas prácticas de código Spark

Antes de pensar en configuraciones avanzadas, la **primera fuente de optimización** está en cómo escribimos el código Spark.

Malas prácticas → pipelines lentos, costosos e inestables.  
Buenas prácticas → rendimiento más alto, menos shuffles, menos memoria usada.

En este bloque veremos:
- Selección de columnas vs `select *`.
- Pushdown de filtros (filtrar lo antes posible).
- Evitar operaciones caras innecesarias (joins, collect, count repetidos).
- Uso consciente de cache/persist.
- Reutilización de DataFrames vs recomputación.

### 1. Selección de columnas

❌ Mala práctica: `select *` en datasets grandes.  
✅ Buena práctica: seleccionar solo las columnas necesarias.

Esto impacta en:
- **E/S de datos** → menos lectura de disco.  
- **Memoria** → menos columnas almacenadas en RAM.  
- **Shuffles** → menos datos transferidos entre nodos.

#### Ejemplo 1

In [0]:
from pyspark.sql import functions as F

# Dataset de ejemplo
df_vuelos = (
    spark.range(0, 10_000_000)
    .withColumn("origen", F.when(F.col("id") % 3 == 0, "MAD").otherwise("BCN"))
    .withColumn("destino", F.when(F.col("id") % 5 == 0, "JFK").otherwise("CDG"))
    .withColumn("delay", (F.rand() * 300).cast("int"))
)


In [0]:
# ❌ select * (todas las columnas)
print("Select *")
%time print(df_vuelos.select("*").count())

# ✅ solo columnas necesarias
print("Select columnas necesarias")
%time print(df_vuelos.select("id", "delay").count())

### 2. Pushdown de filtros (filtrar antes, no después)

❌ Mala práctica: aplicar filtros tarde (después de joins, groupBy, etc.).  
✅ Buena práctica: filtrar lo antes posible.

Beneficios:
- Menos filas procesadas en etapas posteriores.
- Menos datos que mover entre particiones.
- Queries más rápidas.

In [0]:
from pyspark.sql import functions as F

# Dataset de ejemplo
df_vuelos = spark.range(0, 50_000_000).withColumn("origen", (F.rand()*10).cast("int")) \
                                      .withColumn("delay", (F.rand()*500).cast("int"))

In [0]:
# ❌ Filtro tarde (después del groupBy)
df_tarde = df_vuelos.groupBy("origen").count().filter("count > 1000000")

# Ejecución y tiempos
print("Filtro tarde:")
%time df_tarde.collect()

In [0]:
# ✅ Filtro temprano (antes del groupBy)
df_temprano = df_vuelos.filter("delay > 200").groupBy("origen").count()

print("Filtro temprano:")
%time df_temprano.collect()

In [0]:
print("=== Plan Filtro tarde ===")
df_tarde.explain("extended")

In [0]:
print("=== Plan Filtro temprano ===")
df_temprano.explain("extended")

- En `df_tarde` (filtro tarde):
El filtro aparece después del Aggregate. Spark primero hace el groupBy, después reduce los resultados → mucho más costoso.

- En `df_temprano` (filtro temprano):
El filtro aparece antes del Aggregate. Spark reduce los datos antes del shuffle → más eficiente.

### 3. Joins optimizados


Los joins pueden ser el mayor cuello de botella.

- **Shuffle join (por defecto)** → mueve datos entre nodos, costoso.
- **Broadcast join** → si una tabla cabe en memoria, se copia a todos los nodos. Muy rápido.
- **Cross join** → prohibido salvo casos muy controlados.

Regla: siempre que tengas una tabla pequeña, usa `broadcast()`.

In [0]:
from pyspark.sql import functions as F

# Dataset grande de vuelos
df_vuelos = spark.range(0, 20_000_000) \
    .withColumn("origen", (F.rand()*10).cast("int")) \
    .withColumn("delay", (F.rand()*500).cast("int"))

# Dataset pequeño de aeropuertos
df_aeropuertos = spark.createDataFrame(
    [
        ("0", "MAD"), ("1", "BCN"), ("2", "JFK"),
        ("3", "CDG"), ("4", "MEX"), ("5", "SCL")
    ],
    ["codigo", "ciudad"]
).withColumn("codigo", F.col("codigo").cast("int"))

In [0]:
# ❌ Join normal (shuffle join)
df_join_normal = df_vuelos.join(df_aeropuertos, df_vuelos.origen == df_aeropuertos.codigo)

# Ejecución
print("Join normal (con shuffle):")
%time df_join_normal.count()

In [0]:
# ✅ Join con broadcast (spark replica df_aeropuertos en cada executor)
df_join_broadcast = df_vuelos.join(F.broadcast(df_aeropuertos), df_vuelos.origen == df_aeropuertos.codigo)

print("Join con broadcast:")
%time df_join_broadcast.count()

🔎 Qué esperar en tiempos:

 - Join normal → Spark hace un shuffle grande de df_vuelos y df_aeropuertos.
 - Join broadcast → Spark replica la tabla pequeña en cada executor y evita shuffle → más rápido.

In [0]:
print("=== Plan join normal ===")
df_join_normal.explain("extended")

In [0]:
print("=== Plan join broadcast ===")
df_join_broadcast.explain("extended")

### ¿En qué escenario usarías broadcast join en producción?

✅ Cuando:
- Una de las tablas es pequeña (cientos de MB como máximo, normalmente < 10^8 filas).
- La otra tabla es muy grande (miles de millones de filas).
- Quieres evitar un shuffle costoso.

❌ Evitar broadcast si:
- La tabla pequeña no cabe en memoria.
- Hay muchas ejecuciones concurrentes → replicar la tabla puede saturar los workers.


👉 Conclusión:
Un broadcast join es ideal para dimension tables (ej. catálogos de aeropuertos, países, monedas) unidas a fact tables grandes (ej. vuelos, tickets, reservas).

### 4. Cache y persist con criterio


Spark es "lazy" → recalcula un DataFrame cada vez que lo usas.  
Si vas a reutilizarlo varias veces, merece la pena **cachear**. Es decir: `cache` convierte resultados intermedios en algo reutilizable.

❌ Mala práctica: cachear todo "por si acaso".  
✅ Buena práctica: cachear solo cuando se reutiliza varias veces.

Modos:
- `df.cache()` → RAM.
- `df.persist(StorageLevel.MEMORY_AND_DISK)` → RAM y disco.

In [0]:
from pyspark.sql import functions as F
import time

# Dataset grande
df_vuelos = spark.range(0, 20_000_000).withColumn("delay", (F.rand()*500).cast("int"))

# 🚫 Sin cache
df_delay = df_vuelos.filter("delay > 200")

print("== SIN CACHE ==")
t0 = time.time()
df_delay.count()  # Primera acción
print("Primera acción:", round(time.time()-t0, 2), "seg")

t0 = time.time()
df_delay.agg(F.avg("delay")).collect()  # Segunda acción
print("Segunda acción:", round(time.time()-t0, 2), "seg")


# ✅ Con cache
df_delay_cached = df_vuelos.filter("delay > 200").cache()

print("\n== CON CACHE ==")
t0 = time.time()
df_delay_cached.count()  # Primera acción (carga en memoria)
print("Primera acción (con cache):", round(time.time()-t0, 2), "seg")

t0 = time.time()
df_delay_cached.agg(F.avg("delay")).collect()  # Segunda acción (ya cacheado)
print("Segunda acción (con cache):", round(time.time()-t0, 2), "seg")


![Cache en DAG](/Workspace/Users/psanzc@hotmail.com/optimizacion_databricks/Fotos/Ej1 - Job4.png)

🔎 Qué esperar en los tiempos:

- Sin cache → la segunda acción es lenta porque Spark vuelve a recalcular el plan completo (filter sobre los 20M de filas).
- Con cache → la primera acción tarda más (porque Spark calcula + guarda en memoria), pero la segunda acción es mucho más rápida, ya que lee los datos directamente desde RAM/SSD.

### ¿Qué riesgos tiene cachear demasiados DataFrames?

⚠️ Riesgos en producción:

- Consumo de memoria elevado → si cacheas demasiados DataFrames grandes, puedes agotar la memoria del cluster.
- Eviction (expulsión del cache) → Spark liberará cachés antiguos si no hay espacio → recalculando todo de nuevo, lo que puede ser peor que no cachear.
- Persistencia incorrecta → si los datos son poco reutilizados, el overhead de mantenerlos en memoria puede ser mayor que el beneficio.

👉 Buenas prácticas:
- Cachear solo lo que realmente vas a reutilizar varias veces.
- Usar `.unpersist()` cuando ya no lo necesites.
- Monitorizar en Spark UI → pestaña Storage muestra qué DataFrames están cacheados, su tamaño y en qué nivel (MEMORY, DISK).

### Ejercicio Bloque 1

La aerolínea quiere analizar los vuelos **con más de 3 horas de retraso**.  

Tareas:
1. Filtra el DataFrame de vuelos para quedarte con esos casos.  
2. Haz un join con la tabla de aeropuertos (usa `broadcast`).  
3. Calcula el retraso medio por ciudad origen.  
4. Cachea el DataFrame si lo vas a usar más de una vez.  
5. Comprueba que tu plan físico (`explain("extended")`) muestre:
   - Filtro temprano.
   - Broadcast join.
   - Cache.

In [0]:
from pyspark.sql import functions as F

# Dataset de vuelos simulado
vuelos = spark.range(0, 20_000_000) \
    .withColumn("origen", (F.rand()*10).cast("int")) \
    .withColumn("delay", (F.rand()*400).cast("int"))  # retrasos hasta 400 min

# Dataset de aeropuertos (pequeño → ideal para broadcast)
aeropuertos = spark.createDataFrame([
    (0, "Madrid"), (1, "Barcelona"), (2, "Valencia"),
    (3, "Bilbao"), (4, "Sevilla"), (5, "Gran Canaria"),
    (6, "Málaga"), (7, "Zaragoza"), (8, "Alicante"), (9, "Santander")
], ["origen", "ciudad"])

In [0]:
# Tu codigo aqui

## 1. Filtrar vuelos con más de 3 horas de retraso (180 minutos)


In [0]:
# Tu codigo aqui

## 2. Join con tabla de aeropuertos usando broadcast

In [0]:
# Tu codigo aqui

## 3. Calcular retraso medio por ciudad

In [0]:
# Tu codigo aqui

## 4. Cachear el DataFrame (porque lo usaremos varias veces)


Guardamos el DataFrame en memoria como tabla temporal o en Delta/Lakehouse y lo leemos de nuevo. Así evitamos recalcular la query original.

In [0]:
# Tu codigo aqui

## 4.2. Simular el cache

### Guardar en disco en formato Delta

### Leer de nuevo (simulación de cache)

### Primera acción

### Segunda acción (ya sin recomputar)


In [0]:
# 5. Verificar con .explain("extended")


🔎 Qué deberíamos ver en el plan físico:

- Filtro temprano: el `Filter (delay > 180)` aparece justo después del Range inicial.
- Broadcast join: Spark indica `BroadcastHashJoin`.
- Cache: Spark mostrará un `InMemoryTableScan` en ejecuciones posteriores → señal de que lee desde cache.

In [0]:
# 6.1 Comprobar tiempos
import time



In [0]:
# 6.2 Leer de nuevo (simulación de cache)


## Bloque 2. Optimización de particionado y tamaño de archivos

### 1. Ajuste de `spark.sql.shuffle.partitions`

In [0]:
# Revisamos el valor actual
spark.conf.get("spark.sql.shuffle.partitions")

In [0]:
from pyspark.sql import functions as F
import random

# Creamos dataset de 1 millón de filas con año entre 2010 y 2019
data = [(i, random.randint(2010, 2019), f"name_{i%100}") for i in range(1, 1000001)]
df = spark.createDataFrame(data, ["id", "year", "name"])

In [0]:
display(df)

In [0]:
# Función para medir tiempo
import time

def test_shuffle_partitions(n_partitions):
    spark.conf.set("spark.sql.shuffle.partitions", n_partitions)
    start = time.time()
    df.groupBy("name").count().collect()
    end = time.time()
    print(f"Partitions: {n_partitions}, Time: {end - start:.2f} s")

# Probamos diferentes valores
test_shuffle_partitions(50)
test_shuffle_partitions(200)
test_shuffle_partitions(10000)

✅ Claves:

- Mientras más particiones, mayor overhead de tareas pequeñas (task scheduling), pero menor tamaño de cada partición.
- Mientras menos particiones, más riesgo de tareas grandes (skew) y uso elevado de memoria.
- La variación se nota en datasets grandes y con operaciones de shuffle (joins, groupBy, aggregations).

### 2. Particionamiento de tablas Delta

El particionamiento físico de Delta tables mejora las consultas que filtran por la columna de partición.

In [0]:
# Guardamos en Delta particionando por 'year'
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("year").save("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_table_partitioned")

Filtrando por la columna de partición

Aquí debería hacer _partition pruning_.

In [0]:
df_part = spark.read.format("delta").load("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_table_partitioned")

df_filtered_partition = df_part.filter(F.col("year") == 2015) \
                               .withColumn("input_file", F.col("_metadata.file_path"))

display(df_filtered_partition)

print("Archivos leídos al filtrar por partición (year=2015):")
display(df_filtered_partition.select("input_file").distinct())



- Solo se leen archivos dentro de la partición "n".

2. Filtrando por otra columna

👉 Aquí no hay pruning.

In [0]:
df_filtered_other = df_part.filter(F.col("name") == "name_42") \
                           .withColumn("input_file", F.col("_metadata.file_path"))

display(df_filtered_other)

print("Archivos leídos al filtrar por otra columna (name):")
display(df_filtered_other.select("input_file").distinct())



- Se leen archivos de varias particiones, porque Spark no puede usar el particionamiento.

Conclusión
- Filtrar por la columna de partición (year) → menos archivos leídos → consulta más rápida.
- Filtrar por otra columna (name) → Spark escanea todos los archivos → más lento.

## Bloque 3. Optimización en Databricks

### Optimización de ficheros

Simulamos que hay un problema de muchos archivos pequeños

In [0]:
import random

# Simulamos 5 millones de registros con varias columnas
data = [(i, random.randint(2010, 2020), f"airline_{random.randint(1,50)}", random.randint(1, 1000)) 
        for i in range(1, 500001)]
df = spark.createDataFrame(data, ["flight_id", "year", "airline", "passengers"])

# Escribimos en pequeñas tandas para generar muchos archivos
for i in range(10):
    df.limit(50000).write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_flights_smallfiles")

display(dbutils.fs.ls("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_flights_smallfiles"))


In [0]:
def contar_ficheros(path):
    files = dbutils.fs.ls(path)
    # Filtramos solo los archivos Parquet de datos (ignorar _delta_log)
    parquet_files = [f for f in files if f.name.endswith(".parquet")]
    print(f"Número de archivos de datos: {len(parquet_files)}")


In [0]:
contar_ficheros("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_flights_smallfiles")

In [0]:
%sql
-- Compactamos todos los archivos pequeños en menos archivos grandes
OPTIMIZE delta.`dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_flights_smallfiles`

In [0]:
contar_ficheros("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_flights_smallfiles")

#### Z-Order

In [0]:
from pyspark.sql import functions as F
import random

# 10 millones de filas, airline con 500 valores distintos
data = [(i, f"airline_{random.randint(1,500)}", random.randint(2010, 2020)) 
        for i in range(1, 1000001)]

df = spark.createDataFrame(data, ["flight_id", "airline", "year"])

# Guardamos con muchos pequeños archivos
df.repartition(2000).write.format("delta").mode("overwrite").save("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_zorder_demo")


Primero ejecutamos una query que filtre por una aerolínea en la tabla sin optimizar.

In [0]:
import time

df = spark.read.format("delta").load("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_zorder_demo")

start = time.time()
res1 = df.filter(F.col("airline") == "airline_250").count()
end = time.time()

print(f"Antes de Z-ORDER: {end-start:.2f} s, Registros encontrados: {res1}")



Ahora mejoramos el layout de archivos con Z-ORDER en airline:

In [0]:
%sql
OPTIMIZE delta.`dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_zorder_demo`
ZORDER BY (airline)

Esto reorganiza los datos físicamente en disco de modo que valores similares de airline estén más próximos → menos archivos escaneados.

In [0]:
df_opt = spark.read.format("delta").load("dbfs:/Volumes/optimizacion/sesion_2/volumen/delta_zorder_demo")

start = time.time()
res2 = df_opt.filter(F.col("airline") == "airline_250").count()
end = time.time()

print(f"Después de Z-ORDER: {end-start:.2f} s, Registros encontrados: {res2}")


- Misma cantidad de registros.
- Menor tiempo de ejecución.

**Conclusion:**
- Cuando haces consultas frecuentes por columnas con alta cardinalidad (ej. airline, customer_id, flight_number).
- Cuando esas columnas no son buenas candidatas para particionamiento físico (ej. demasiados valores distintos).
- Cuando necesitas reducir el número de archivos escaneados en queries selectivas.

**Particionar vs. Z-ORDER**
- Particionar físicamente → crea carpetas en disco (/year=2020/, /year=2021/). Solo conviene en columnas de baja cardinalidad (ej. year, region).
- Z-ORDER → no crea carpetas, sino que ordena los datos dentro de los archivos Delta para mejorar localización. Ideal en columnas de alta cardinalidad.

#### Motores

##### Adaptive Query Execution (AQE)

- AQE = Adaptive Query Execution.
- Spark ajusta dinámicamente el número de particiones y la estrategia de join durante la ejecución de la query.
- Así evita problemas como:
    - Demasiadas particiones pequeñas → exceso de overhead.
    - Pocas particiones grandes → skew (particiones desbalanceadas).
- AQE ya está habilitado por defecto.
- No se puede cambiar spark.conf, pero sí podemos observar el Query Profile

In [0]:
spark.conf.get("spark.sql.adaptive.enabled", "false")

In [0]:
# Forzamos una operación de shuffle grande
# Generamos 5 millones de filas con datos de vuelos
data = [
    (i, random.randint(2010, 2020), f"airline_{random.randint(1,500)}", random.randint(50, 300))
    for i in range(1, 5000001)
]

df = spark.createDataFrame(data, ["flight_id", "year", "airline", "passengers"])

agg = df.groupBy("airline").agg(F.sum("passengers").alias("total_passengers"))
agg.explain("formatted")

- Si AQE entra en juego, verás que Spark reporta un número de particiones distinto al que esperarías (ajustado dinámicamente).
- También puede que reemplace un SortMergeJoin por un BroadcastJoin si detecta que una tabla es pequeña en tiempo de ejecución.

##### Photon

- Photon = motor de ejecución de consultas vectorizado escrito en C++.
- Acelera operaciones típicas de analítica (scans, filters, joins, aggregates).

%md
![Cluster Photon](/Workspace/Users/psanzc@hotmail.com/optimizacion_databricks/Fotos/Crear cluster 2.png)

##### Autoscaling

- Es la función de Databricks que aumenta o reduce automáticamente el número de nodos del cluster según la carga.
- Ayuda a equilibrar costo vs. rendimiento.
- Pero en Databricks Free Edition no se puede habilitar autoscaling, ya que el cluster siempre tiene 1 nodo.

In [0]:
from concurrent.futures import ThreadPoolExecutor
from pyspark.sql import functions as F

data = [
    (i, random.randint(2010, 2020), f"airline_{random.randint(1,500)}", random.randint(50, 300))
    for i in range(1, 100001)
]

df = spark.createDataFrame(data, ["flight_id", "year", "airline", "passengers"])

def heavy_query():
    return df.groupBy("year").agg(F.avg("passengers")).collect()

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(heavy_query) for _ in range(5)]


- En Free Edition: no verán escalado, pero sí notarán que varias queries corren en paralelo y tardan más porque comparten el único nodo.
- En un cluster real con autoscaling: se habrían añadido más nodos automáticamente.