# Persistencia y particionado
En este tema trataremos dos aspectos de Apache Spark

- **Persistencia**: cómo guardar DataFrames y RDDs de forma que no tengan que ser recalculados
- **Particionado**: cómo especificar y cambiar las particiones de un DataFrame o RDD

## Persistencia

Problema al usar un DataFrame o un RDD varias veces:

-   Spark recomputa el RDD y sus dependencias cada vez que se ejecuta una acción
-   Muy costoso (especialmente en problemas iterativos)

Solución

-   Conservar el DataFrame o RDD en memoria y/o disco
-   Métodos `cache()` o `persist()`

### Niveles de persistencia (definidos en [`pyspark.StorageLevel`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) y [`org.apache.spark.storage.StorageLevel`](https://spark.apache.org/docs/3.5.1/api/scala/org/apache/spark/storage/StorageLevel$.html))

 Nivel                | Espacio  | CPU     | Memoria/Disco   | Descripción
 :------------------: | :------: | :-----: | :-------------: | ------------------
 MEMORY_ONLY          |   Alto   |   Bajo  |     Memoria     | Guarda el RDD como un objeto Java no serializado en la JVM. Si el RDD no cabe en memoria, algunas particiones no se *cachearán* y serán recomputadas "al vuelo" cada vez que se necesiten. 
 MEMORY_AND_DISK      |   Alto   |   Medio |     Ambos       | Guarda el RDD como un objeto Java no serializado en la JVM. Si el RDD no cabe en memoria, las particiones que no quepan se guardan en disco y se leen del mismo cada vez que se necesiten.
 DISK_ONLY            |   Bajo   |   Alto  |     Disco       | Guarda las particiones del RDD solo en disco.
 OFF_HEAP             |   Bajo   |   Alto  |   Memoria       | Similar a MEMORY_ONLY_SER pero guarda el RDD serializado usando memoria *off-heap* (fuera del heap de la JVM) lo que puede reducir el overhead del recolector de basura.
   


    
### Nivel de persistencia

-   El nivel por defecto para DataFrames es MEMORY_ONLY

-   En Python, los datos siempre se guardan en memoria serializados (usando *pickle*)

    - Es posible especificar serialización (la forma en la que se serializan los datos para mantenerlos en memoria o en disco). Por defecto se utiliza el serializador "Pickle" en Python

    
### Recuperación de fallos

-   Si falla un nodo con datos almacenados, el DataFrame o RDD se recomputa

    -   Añadiendo `_2` (ó `_3`) al nivel de persistencia (por ejemplo, MEMORY_ONLY_2), se guardan 2 copias del RDD
        
### Gestión de la cache

-   Algoritmo LRU (Least Recently Used) para gestionar la cache

    -   Para niveles *solo memoria*, los RDDs viejos se eliminan y se recalculan
    -   Para niveles *memoria y disco*, las particiones que no caben se escriben a disco
    
### Importante:

- La persistencia debe usarse solo cuando sea necesaria, puesto que puede implicar un coste importante


## Persistencia con DataFrames

In [None]:
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession

# Elegir el máster de Spark dependiendo de si se ha definido la variable de entorno HADOOP_CONF_DIR o YARN_CONF_DIR
SPARK_MASTER: str = (
    "yarn" if "HADOOP_CONF_DIR" in os.environ or "YARN_CONF_DIR" in os.environ else "local[*]"
)
print(f"Usando Spark Master en {SPARK_MASTER}")

# Creamos un objeto SparkSession (o lo obtenemos si ya está creado)
spark: SparkSession = (
    SparkSession.builder.appName("Mi aplicacion")
    .config("spark.rdd.compress", "true")
    .config("spark.executor.memory", "6g")
    .config("spark.driver.memory", "6g")
    .master(SPARK_MASTER)
    .getOrCreate()
)

sc: SparkContext = spark.sparkContext

In [None]:
from collections.abc import Iterator

import numpy as np
from pyspark.sql import Row
from pyspark.sql.dataframe import DataFrame


def generate_random_data(n: int) -> Iterator[Row]:
    """Generador que produce Rows con datos aleatorios sin almacenarlos previamente."""
    row_type = Row("n", "x")
    yield from (row_type(i, float(np.random.random())) for i in range(n))


DF1: DataFrame = spark.createDataFrame(generate_random_data(100000))

DF1.printSchema()
print(f"Cacheado: {DF1.is_cached}.")
print(f"Nivel sin persistencia: {DF1.storageLevel}.")

In [None]:
DF1.cache()
print(f"Cacheado: {DF1.is_cached}.")
print(f"Nivel de persistencia por defecto: {DF1.storageLevel}.")

In [None]:
# La persistencia no se hereda en las transformaciones
DF2: DataFrame = DF1.groupBy("x").count()
print(f"Cacheado: {DF2.is_cached}.")

In [None]:
from pyspark import StorageLevel

# Para cambiar el nivel de persistencia, primero tenemos que quitarlo de la cache
DF1.unpersist()

DF1.persist(StorageLevel.MEMORY_ONLY_2)
print(f"Cacheado: {DF1.is_cached}.")
print(f"Número de particiones: {DF1.rdd.getNumPartitions()}.")
print(f"Nuevo nivel de persistencia: {DF1.storageLevel}.")

### Persistencia con RDDs

In [None]:
from pyspark import RDD

rdd: RDD[int] = sc.parallelize(range(1000), 10)
print(f"Cacheado: {rdd.is_cached}")
print(f"Particiones: {rdd.getNumPartitions()}")
print(f"Nivel de persistencia sin cachear: {rdd.getStorageLevel()}")

In [None]:
rdd.cache()

print(f"Cacheado: {rdd.is_cached}")
print(f"Nivel de persistencia por defecto: {rdd.getStorageLevel()}")

## Particionado en Spark

El particionado es crucial para el rendimiento de Spark. Afecta a:
- **Paralelismo**: más particiones → más tareas paralelas
- **Shuffles**: operaciones como joins y aggregaciones reorganizan datos entre particiones
- **Escritura de archivos**: cada partición genera uno o más archivos
- **Localidad de datos**: mantener datos relacionados juntos reduce la comunicación de red

### Conceptos clave

**Para DataFrames:**
- `spark.sql.shuffle.partitions` (por defecto 200): número de particiones en operaciones *wide* (joins, aggregaciones)
- Se puede cambiar dinámicamente: `spark.conf.set("spark.sql.shuffle.partitions", 100)`

**Para RDDs:**
- `spark.default.parallelism`: número de particiones por defecto en transformaciones
- Propiedad: `sc.defaultParallelism`

**Funciones útiles:**
- `df.rdd.getNumPartitions()`: obtener número de particiones
- `df.repartition(n)`: crear exactamente n particiones (provoca shuffle completo)
- `df.coalesce(n)`: reducir particiones sin shuffle completo (más eficiente)
- `df.repartition(n, "columna")`: particionar por columna específica

### Cuándo preocuparse por el particionado

1. **Escritura de archivos** → muchas particiones = muchos archivos pequeños
2. **Joins grandes** → mal particionado = shuffles lentos
3. **Agregaciones con datos desbalanceados** → algunas particiones muy grandes
4. **Out of Memory** → demasiados datos en una partición

### Caso 1: Particionado al escribir archivos Parquet

El problema más común: escribir DataFrames con muchas particiones genera miles de archivos pequeños (*small files problem*)

In [None]:
# Creamos un DataFrame de ejemplo
from collections.abc import Iterable

from pyspark.sql import Row
from pyspark.sql.dataframe import DataFrame


def data_gen() -> Iterable[Row]:
    for _ in range(1000):
        yield from [
            Row(usuario_id=1, país="ES", año=2023, posts=10),
            Row(usuario_id=2, país="FR", año=2023, posts=5),
            Row(usuario_id=3, país="ES", año=2024, posts=8),
            Row(usuario_id=4, país="IT", año=2023, posts=12),
            Row(usuario_id=5, país="FR", año=2024, posts=15),
            Row(usuario_id=6, país="ES", año=2024, posts=20),
        ]


dfUsuarios: DataFrame = spark.createDataFrame(data_gen())
print(f"Número de particiones por defecto: {dfUsuarios.rdd.getNumPartitions()}")
dfUsuarios.show(10)

In [None]:
# MALO: escribir con muchas particiones crea muchos archivos pequeños
import os
import shutil

output_dir = "/tmp/usuarios_mal_particionado"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

dfUsuarios.write.parquet(output_dir)

# Veamos cuántos archivos se crearon
archivos: list[str] = [f for f in os.listdir(output_dir) if f.endswith('.parquet')]
print(f"Se crearon {len(archivos)} archivos pequeños")
print(f"Archivos: {archivos[:5]}...")  # Mostramos solo los primeros 5

BUENO: consolidar particiones antes de escribir.

In [None]:
output_dir_bueno = "/tmp/usuarios_bien_particionado"
if os.path.exists(output_dir_bueno):
    shutil.rmtree(output_dir_bueno)

# Reducir a 2 particiones antes de escribir
dfUsuarios.coalesce(2).write.parquet(output_dir_bueno)

archivos_buenos: list[str] = [f for f in os.listdir(output_dir_bueno) if f.endswith('.parquet')]
print(f"Se crearon {len(archivos_buenos)} archivos optimizados")
print(f"Archivos: {archivos_buenos}")

MEJOR: Particionar por columnas para consultas eficientes.

In [None]:
output_dir_particionado = "/tmp/usuarios_particionado"
if os.path.exists(output_dir_particionado):
    shutil.rmtree(output_dir_particionado)

# Particionar por año y país crea una estructura de directorios
dfUsuarios.write.partitionBy("año", "país").parquet(output_dir_particionado)

print("Estructura de directorios creada:")
for root, dirs, files in os.walk(output_dir_particionado):
    level: int = root.replace(output_dir_particionado, '').count(os.sep)
    indent: str = ' ' * 2 * level
    print(f'{indent}{os.path.basename(root)}/')
    if level < 2:  # Solo mostramos 2 niveles
        subindent: str = ' ' * 2 * (level + 1)
        for file in files[:2]:  # Solo primeros 2 archivos
            print(f'{subindent}{file}')

In [None]:
# Ventaja: Spark solo lee las particiones necesarias (predicate pushdown)
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col

df_leido: DataFrame = spark.read.parquet(output_dir_particionado)

# Solo lee la carpeta año=2024/pais=ES/
df_filtrado: DataFrame = df_leido.filter((col("año") == 2024) & (col("país") == "ES"))

print("Al filtrar por año y país, Spark solo lee esas particiones:")
df_filtrado.show(5)

# Podemos ver qué particiones se leyeron en el plan de ejecución
df_filtrado.explain("formatted")

### Caso 2: Particionado para joins eficientes

Cuando hacemos un join, Spark debe reorganizar los datos (shuffle). Reparticionar por la clave del join puede mejorar el rendimiento.

In [None]:
# Creamos dos DataFrames para hacer join
from pyspark.sql.dataframe import DataFrame

dfPreguntas: DataFrame = spark.createDataFrame(
    [Row(pregunta_id=i, usuario_id=i % 100, texto=f"Pregunta {i}") for i in range(1000)]
)

dfRespuestas: DataFrame = spark.createDataFrame(
    [Row(respuesta_id=i, pregunta_id=i % 500, respuesta=f"Respuesta {i}") for i in range(2000)]
)

print(f"Preguntas: {dfPreguntas.rdd.getNumPartitions()} particiones")
print(f"Respuestas: {dfRespuestas.rdd.getNumPartitions()} particiones")

Join sin optimizar: Spark hace shuffle con 200 particiones (por defecto)


In [None]:
from pyspark.sql.dataframe import DataFrame

resultado_normal: DataFrame = dfPreguntas.join(dfRespuestas, "pregunta_id", "inner")
print(f"Join normal: {resultado_normal.rdd.getNumPartitions()} particiones")
resultado_normal.show(5)

Optimizado: reparticionamos ambos DataFrames por la clave del join.
Esto co-localiza los datos relacionados en las mismas particiones.

In [None]:
from pyspark.sql.dataframe import DataFrame

dfPreguntas_part: DataFrame = dfPreguntas.repartition(50, "pregunta_id")
dfRespuestas_part: DataFrame = dfRespuestas.repartition(50, "pregunta_id")

resultado_optimizado: DataFrame = dfPreguntas_part.join(dfRespuestas_part, "pregunta_id", "inner")
print(f"Join optimizado: {resultado_optimizado.rdd.getNumPartitions()} particiones")
resultado_optimizado.show(5)

# El join será más eficiente porque los datos ya están co-localizados

### Caso 3: Control del número de particiones en shuffles

Por defecto, Spark usa 200 particiones en operaciones *wide*. Esto puede ser demasiado o insuficiente dependiendo del tamaño de los datos.

In [None]:
# Ver el valor actual de spark.sql.shuffle.partitions
from pyspark.sql.dataframe import DataFrame

print(f"Particiones por defecto en shuffles: {spark.conf.get('spark.sql.shuffle.partitions')}")

# Hacer una agregación (operación wide)
resultado_agg: DataFrame = dfUsuarios.groupBy("país", "año").sum("posts")
print(f"Particiones después de groupBy: {resultado_agg.rdd.getNumPartitions()}")

In [None]:
# Ajustar el número de particiones para datasets pequeños
from pyspark.sql.dataframe import DataFrame

spark.conf.set("spark.sql.shuffle.partitions", 10)

resultado_agg_optimizado: DataFrame = dfUsuarios.groupBy("país", "año").sum("posts")
print(
    f"Particiones después de groupBy optimizado: {resultado_agg_optimizado.rdd.getNumPartitions()}"
)

# Regla general:
# - Datos pequeños (< 1GB): 10-50 particiones
# - Datos medianos (1-10GB): 50-200 particiones
# - Datos grandes (> 10GB): 200+ particiones

### Caso 4: Adaptive Query Execution (AQE)

AQE es una característica moderna de Spark que optimiza automáticamente el número de particiones durante la ejecución.

In [None]:
# Habilitar AQE (en Spark 3.2+ está habilitado por defecto)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

print("Configuración de AQE:")
print(f"  - AQE habilitado: {spark.conf.get('spark.sql.adaptive.enabled')}")
print(f"  - Coalesce automático: {spark.conf.get('spark.sql.adaptive.coalescePartitions.enabled')}")
print(f"  - Manejo de skewed joins: {spark.conf.get('spark.sql.adaptive.skewJoin.enabled')}")

In [None]:
# Con AQE, Spark ajustará automáticamente las particiones
# Ejemplo: después de un filtro que reduce mucho los datos

from pyspark.sql.dataframe import DataFrame

spark.conf.set("spark.sql.shuffle.partitions", 200)  # Empezamos con 200

resultado_filtrado: DataFrame = dfUsuarios.filter(col("país") == "ES").groupBy("año").sum("posts")

print("Particiones iniciales configuradas: 200")
print(f"Particiones finales (después de AQE): {resultado_filtrado.rdd.getNumPartitions()}")
print("\nAQE combinó particiones pequeñas automáticamente")

resultado_filtrado.show()

### Caso 5: Bucketing para tablas persistentes

Bucketing es útil cuando haremos joins repetidos sobre las mismas columnas. Pre-organiza los datos en "buckets".

In [None]:
# Crear tablas con bucketing
# Los datos se organizan en buckets por usuario_id

# Primero limpiamos tablas existentes
from pyspark.sql.dataframe import DataFrame

spark.sql("DROP TABLE IF EXISTS preguntas_bucketed")
spark.sql("DROP TABLE IF EXISTS respuestas_bucketed")

# Crear tablas con bucketing
dfPreguntas.write.bucketBy(50, "usuario_id").sortBy("pregunta_id").mode("overwrite").saveAsTable(
    "preguntas_bucketed"
)

# Crear DataFrame de respuestas con usuario_id
dfRespuestas_conUsuario: DataFrame = dfRespuestas.withColumn("usuario_id", col("pregunta_id") % 100)

dfRespuestas_conUsuario.write.bucketBy(50, "usuario_id").sortBy("respuesta_id").mode(
    "overwrite"
).saveAsTable("respuestas_bucketed")

print("Tablas creadas con bucketing por usuario_id.")

In [None]:
# Los joins sobre tablas bucketed son mucho más eficientes
# No necesitan shuffle porque los datos ya están co-localizados

from pyspark.sql.dataframe import DataFrame

df_preg: DataFrame = spark.table("preguntas_bucketed")
df_resp: DataFrame = spark.table("respuestas_bucketed")

resultado_bucketed: DataFrame = df_preg.join(df_resp, "usuario_id", "inner")

print("Join sobre tablas bucketed:")
resultado_bucketed.show(5)

# Podemos ver en el plan de ejecución que NO hay shuffle
print("\nPlan de ejecución (nota: sin Exchange/Shuffle):")
resultado_bucketed.explain("simple")

### Caso 6: Manejo de datos desbalanceados (Data Skew)

Cuando algunos valores de una clave tienen muchos más datos que otros, puede causar que algunas tareas tomen mucho más tiempo.

In [None]:
# Ejemplo: crear datos desbalanceados (un usuario con muchos posts)

from pyspark.sql.dataframe import DataFrame

data_skewed = []
# Usuario 1 tiene 10,000 posts (skew!)
data_skewed.extend([Row(usuario_id=1, post=f"Post {i}") for i in range(10000)])
# Usuarios 2-100 tienen solo 10 posts cada uno
for user in range(2, 101):
    data_skewed.extend([Row(usuario_id=user, post=f"Post {i}") for i in range(10)])

dfSkewed: DataFrame = spark.createDataFrame(data_skewed)

print(f"Total de filas: {dfSkewed.count()}")
print("\nDistribución de posts por usuario:")
dfSkewed.groupBy("usuario_id").count().orderBy(col("count").desc()).show(5)

In [None]:
# Sin AQE, el skew causa que una tarea sea muy lenta
from pyspark.sql.dataframe import DataFrame

spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", 10)

# Hacer un groupBy - una partición tendrá 10,000 filas, las demás ~10
resultado_sin_aqe: DataFrame = dfSkewed.groupBy("usuario_id").count()
print(f"Particiones: {resultado_sin_aqe.rdd.getNumPartitions()}")
print("Una partición tendrá 10,000 filas (usuario 1), causando desbalance")
resultado_sin_aqe.show(5)

In [None]:
# Con AQE, Spark detecta y maneja el skew automáticamente
from pyspark.sql.dataframe import DataFrame

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "10MB")

resultado_con_aqe: DataFrame = dfSkewed.groupBy("usuario_id").count()
print("AQE detectará y manejará la partición desbalanceada automáticamente")
resultado_con_aqe.show(5)

# En un join con skew, AQE dividirá la partición grande en varias más pequeñas

## Apéndice: Operaciones avanzadas con RDDs

Las siguientes operaciones son útiles para casos específicos con RDDs, pero raramente necesarias con DataFrames modernos.

### Trabajando a nivel de partición con RDDs

Permite aplicar operaciones una vez por partición en lugar de una vez por elemento (útil para operaciones costosas como conexiones a bases de datos).

In [None]:
# Ejemplo: mapPartitions con RDDs
from collections.abc import Iterable

from pyspark import RDD

rdd_nums: RDD[int] = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 4)
print("Datos por partición:")
print(rdd_nums.glom().collect())


def suma_y_cuenta(iter: Iterable[int]) -> list[tuple[int, int]]:
    """Suma y cuenta elementos de una partición"""
    suma = 0
    cuenta = 0
    for i in iter:
        suma += i
        cuenta += 1
    return [(suma, cuenta)]


# Aplica la función una vez por partición (no por elemento)
resultado: RDD[tuple[int, int]] = rdd_nums.mapPartitions(suma_y_cuenta)
print("\nSuma y cuenta por partición:")
print(resultado.collect())

In [None]:
# Ejemplo con partitionBy para RDDs clave-valor
rdd_pairs: RDD[tuple[int, int]] = sc.parallelize([(1, 10), (2, 20), (1, 15), (3, 30), (2, 25)], 3)

print(f"Particiones originales: {rdd_pairs.getNumPartitions()}")
print("Datos por partición:")
print(rdd_pairs.glom().collect())

# Particionar por clave (agrupa claves iguales en la misma partición)
rdd_particionado = rdd_pairs.partitionBy(2)
print(f"\nParticiones después de partitionBy(2): {rdd_particionado.getNumPartitions()}")
print("Datos por partición (claves iguales juntas):")
print(rdd_particionado.glom().collect())

## Resumen: Mejores prácticas de particionado

### Siempre hacer:

1. **Usar `coalesce()` antes de escribir** para evitar archivos pequeños
   ```python
   df.coalesce(10).write.parquet("output/")
   ```

2. **Particionar datos por columnas de filtrado frecuente**
   ```python
   df.write.partitionBy("año", "país").parquet("output/")
   ```

3. **Habilitar AQE** (en Spark 3.2+ está habilitado por defecto)
   ```python
   spark.conf.set("spark.sql.adaptive.enabled", "true")
   ```

4. **Ajustar `spark.sql.shuffle.partitions` según el tamaño de los datos**
   - Pequeño (< 1GB): 10-50 particiones
   - Mediano (1-10GB): 50-200 particiones
   - Grande (> 10GB): 200+ particiones

### Evitar:

1. **Demasiadas particiones pequeñas** → overhead de scheduling
2. **Muy pocas particiones grandes** → poco paralelismo, riesgo de OOM
3. **Repartición innecesaria** → shuffle costoso sin beneficio
4. **Particionamiento excesivo al escribir** → demasiadas carpetas/archivos

### Cuándo reparticionar:

- **Antes de joins grandes**: reparticionar por la clave del join
- **Después de filtros fuertes**: usar `coalesce()` para reducir particiones vacías
- **Antes de escribir**: consolidar con `coalesce()` para archivos optimizados
- **Con datos desbalanceados**: dejar que AQE lo maneje automáticamente

### Herramientas de diagnóstico:

```python
# Ver el plan de ejecución
df.explain("formatted")

# Ver número de particiones
df.rdd.getNumPartitions()

# Ver configuración actual
spark.conf.get("spark.sql.shuffle.partitions")
```

In [None]:
# Limpieza de archivos y tablas temporales
import shutil

# Eliminar directorios temporales
for path in [
    "/tmp/usuarios_mal_particionado",
    "/tmp/usuarios_bien_particionado",
    "/tmp/usuarios_particionado",
]:
    if os.path.exists(path):
        shutil.rmtree(path)
        print(f"Eliminado: {path}.")

# Eliminar tablas temporales
spark.sql("DROP TABLE IF EXISTS preguntas_bucketed")
spark.sql("DROP TABLE IF EXISTS respuestas_bucketed")
print("Tablas temporales eliminadas.")

print("\nLimpieza completada.")