# Fundamentos de Apache Spark: Funciones avanzadas

funciones avanzadas para optimizar el rendimiento de Spark, para imputar valores faltantes o a crear funciones definidas por el usuario (UDF).

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

In [0]:
# Lista de tuplas que contiene información sobre los empleados: id, nombre, departamento y salario.
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

# Lista de tuplas que contiene la relación entre el identificador del departamento y su nombre
dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

# Crear un DataFrame de Spark a partir de la lista de empleados 'emp', especificando los nombres de las columnas
df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])

# Crear un DataFrame de Spark a partir de la lista de departamentos 'dept', especificando los nombres de las columnas
deptdf = spark.createDataFrame(dept, ["id", "name"])

In [0]:
# Creacion de vista Temporal
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

# Guarda en una Tabla HIVE.
df.write.saveAsTable("hive_empdf", mode = "overwrite")
deptdf.write.saveAsTable("hive_deptdf", mode = "overwrite")

# `.cache()` y `.persist()` en PySpark

En PySpark, los métodos `.cache()` y `.persist()` son técnicas de optimización que permiten almacenar los datos de un **DataFrame** o **RDD** para su reutilización, evitando cálculos redundantes. Estos métodos se utilizan cuando se realizan múltiples operaciones sobre los mismos datos, lo cual mejora el rendimiento al no tener que recalcular el mismo resultado varias veces.

## `.cache()`

### Concepto

El método `.cache()` se utiliza para almacenar un **DataFrame** o **RDD** en **memoria**. Este almacenamiento permite acceder rápidamente a los datos durante las siguientes operaciones, ya que no es necesario volver a leer los datos desde el origen o recalcular los resultados de las operaciones previas.

Cuando se llama al método `.cache()`, PySpark almacena el **DataFrame** o **RDD** en la **memoria RAM** de los nodos del clúster. Si hay suficiente memoria disponible, las siguientes operaciones sobre esos datos serán mucho más rápidas.

**Nota**: `.cache()` es simplemente una forma específica de usar `.persist()` con el nivel de almacenamiento `MEMORY_ONLY`, es decir, almacena los datos solo en memoria.

### Cuándo usar `.cache()`

- **Optimización en operaciones repetitivas**: Se recomienda usar `.cache()` cuando sabes que los datos serán utilizados varias veces durante el proceso, evitando que PySpark tenga que volver a calcular o leer los mismos datos una y otra vez.
- **Limitación**: Debido a que solo usa la memoria, si los datos no caben completamente en la memoria, se puede generar un error o un mal desempeño en el procesamiento.

## `.persist()`

### Concepto

El método `.persist()` es más **flexible** que `.cache()`. Permite almacenar un **DataFrame** o **RDD** en varios niveles de almacenamiento que pueden incluir la memoria, el disco, o una combinación de ambos. Esto es útil cuando los datos son demasiado grandes para caber completamente en la memoria, o cuando se necesita un control más granular sobre dónde y cómo se almacenan los datos.

A diferencia de `.cache()`, que utiliza solo la memoria, `.persist()` te permite elegir entre diferentes **niveles de almacenamiento** como:
- **En memoria (MEMORY_ONLY)**: Almacena los datos solo en la memoria.
- **En memoria y disco (MEMORY_AND_DISK)**: Almacena los datos en memoria y, si no caben, los guarda también en disco.
- **Solo en disco (DISK_ONLY)**: Solo almacena los datos en disco, sin usar la memoria.

### Cuándo usar `.persist()`

- **Optimización avanzada**: Usar `.persist()` es ideal cuando se trabaja con grandes volúmenes de datos y se sabe que no cabe todo en memoria. Permite especificar el nivel de almacenamiento adecuado según la situación.
- **Mejor control de recursos**: Proporciona mayor control sobre el uso de recursos, ya que puedes optar por persistir en disco, en memoria, o en una combinación de ambos.

## Diferencias entre `.cache()` y `.persist()`

- **Simplicidad vs Flexibilidad**: `.cache()` es más sencillo de usar, pero ofrece menos flexibilidad que `.persist()`. `.persist()` permite definir el nivel de almacenamiento, lo que resulta más útil cuando se necesita una gestión más avanzada de los recursos.
- **Uso de memoria**: `.cache()` siempre intenta almacenar los datos en memoria, lo cual puede ser un problema si los datos son grandes. Mientras que `.persist()` ofrece más opciones como almacenamiento en disco o una combinación de ambos, lo que permite un manejo más eficiente de los recursos.

## Resumen

- **`.cache()`**: Es una forma rápida y sencilla de almacenar un **DataFrame** o **RDD** en memoria. Es ideal para cuando los datos caben en memoria y se van a reutilizar varias veces durante el proceso.
- **`.persist()`**: Ofrece mayor flexibilidad al permitir almacenar los datos en diferentes niveles de almacenamiento (memoria, disco o combinación de ambos), lo cual es útil para trabajar con grandes volúmenes de datos.

Ambos métodos son herramientas útiles en PySpark para optimizar el rendimiento, especialmente en procesos que requieren múltiples accesos a los mismos datos. Elegir entre `.cache()` y `.persist()` dependerá del tamaño de los datos y los recursos disponibles.


In [0]:
df.cache()
df.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


Cuando utilizamos la función `.cache()`, en versiones anteriores a Spark 2.1.x (hasta Spark 2.0.2), los datos se almacenan exclusivamente en memoria, utilizando el nivel de almacenamiento `MEMORY_ONLY`. Sin embargo, a partir de Spark 2.1.x, el comportamiento por defecto de `.cache()` cambió y ahora utiliza el nivel de almacenamiento `MEMORY_AND_DISK`, lo que significa que, si los datos no caben completamente en memoria, se almacenarán en disco.

Si necesitamos un control más específico sobre los niveles de almacenamiento, podemos utilizar el método `.persist()`. Este método nos permite seleccionar entre diferentes niveles de almacenamiento, como por ejemplo, si deseamos almacenar los datos solo en memoria, podemos utilizar el nivel `MEMORY_ONLY`. 

A continuación se muestra un ejemplo de cómo especificar este comportamiento utilizando `.persist()`:

```python
df.persist(StorageLevel.MEMORY_ONLY)
```

De este modo, podemos elegir el nivel de almacenamiento que mejor se adapte a nuestras necesidades según el tamaño de los datos y los recursos disponibles.

In [0]:
from pyspark.storagelevel import StorageLevel

In [0]:
deptdf.persist(StorageLevel.MEMORY_ONLY)
deptdf.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


# `.unpersist()` en PySpark

El método `.unpersist()` en PySpark se utiliza para **liberar el almacenamiento** de un **DataFrame** o **RDD** que previamente ha sido almacenado en memoria o en disco mediante los métodos `.cache()` o `.persist()`. 

Cuando un DataFrame o RDD es persistido, los datos permanecen en la memoria o en el disco hasta que se realice explícitamente una acción para liberarlos. Esto es útil para evitar el consumo innecesario de memoria o recursos de disco una vez que los datos ya no son necesarios, mejorando así el rendimiento y la utilización de los recursos.

## Concepto

- **Liberación de almacenamiento**: `.unpersist()` elimina los datos almacenados en memoria o en disco para un DataFrame o RDD que fue previamente persistido o cacheado.
- **Optimización de recursos**: Este método es útil cuando sabemos que no necesitaremos los datos de nuevo en el futuro cercano, lo que permite liberar memoria o espacio en disco para otras operaciones.

## Sintaxis

```python
df.unpersist()

```
- `df`: El DataFrame o RDD del cual deseas liberar los recursos que ocupan memoria o disco.

## Cuándo usar .unpersist()?
- **Después de completar las operaciones:** Una vez que ya no se necesitan los datos en memoria o en disco, es recomendable llamar a .unpersist() para liberar esos recursos y evitar un uso innecesario de memoria, especialmente cuando se trabajan con grandes volúmenes de datos.

- **Control de recursos:** Es importante liberar los recursos cuando los datos persistidos ya no se usarán en el futuro para optimizar el rendimiento de la aplicación y evitar posibles cuellos de botella.

In [0]:
df.unpersist()

Out[7]: DataFrame[id: bigint, name: string, dept: string, salary: bigint]

# `.clearCache()` en PySpark

El método `.clearCache()` en PySpark se utiliza para **eliminar todo el contenido almacenado en caché** en la sesión de Spark. Este método elimina los **DataFrames** o **RDDs** que han sido almacenados en memoria mediante los métodos `.cache()` o `.persist()`. 

Cuando se utiliza `.clearCache()`, se liberan los recursos de memoria que estaban siendo utilizados por los datos cacheados, lo que puede ser útil para liberar memoria y mejorar el rendimiento del clúster cuando ya no es necesario mantener esos datos en memoria.

## Concepto

- **Liberación de la caché**: `.clearCache()` elimina todos los datos almacenados en la caché en la sesión actual de Spark, liberando la memoria que estaba siendo utilizada.
- **Optimización de recursos**: Es útil para evitar que los recursos de memoria se consuman innecesariamente cuando los datos almacenados en caché ya no son necesarios para las operaciones posteriores.

## Sintaxis

```python
spark.catalog.clearCache()
```
## ¿Cuándo usar .clearCache()?
- **Después de completar todas las operaciones:** Si has utilizado .cache() o .persist() para almacenar datos en memoria, pero ya no los necesitas, puedes usar .clearCache() para liberar la memoria y optimizar el uso de recursos.

- **Cuando se trabaja con grandes volúmenes de datos:** Si los datos cacheados ocupan demasiada memoria o si estás experimentando cuellos de botella de memoria, .clearCache() es útil para liberar esa memoria y continuar con otras tareas.

- **Mantener el clúster eficiente:** Si estás trabajando con múltiples aplicaciones o procesos en un clúster de Spark, liberar la caché puede mejorar la eficiencia global del sistema.

In [0]:
sqlContext.clearCache()

## Expresiones SQL

### `expr` y `selectExpr` en PySpark

En PySpark, tanto **`expr`** como **`selectExpr`** son funciones que nos permiten ejecutar expresiones SQL dentro de un **DataFrame**. Ambas funciones ofrecen una forma flexible y poderosa de realizar transformaciones en los datos utilizando sintaxis SQL directamente en PySpark.

### `expr`

### Concepto

La función **`expr`** se utiliza para **evaluar expresiones SQL** dentro de un **DataFrame**. Permite realizar operaciones sobre columnas utilizando la sintaxis SQL, como cálculos matemáticos, concatenaciones de texto, conversiones de tipos de datos, y más. Esta función es útil cuando queremos aplicar expresiones complejas en columnas o en nuevas columnas.

### Sintaxis

```python
from pyspark.sql.functions import expr

df.withColumn("nueva_columna", expr("columna1 + columna2"))
```
### `selectExpr`

### Concepto
La función `selectExpr` es similar a `select()`, pero con la diferencia de que permite utilizar expresiones SQL dentro de su llamado. Se utiliza para seleccionar columnas o realizar transformaciones directamente con expresiones SQL. Esto es muy útil para realizar operaciones complejas o combinaciones de columnas sin necesidad de escribir múltiples líneas de código.

### Sintaxis
```pyspark
#usarmos "*" para visualizar todos los campos de la tabla
df.selectExpr("*", "expresión SQL")
```


In [0]:
from pyspark.sql.functions import expr

# Intentemos categorizar el salario en Bajo, Medio y Alto según la categorización a continuación.

# 0-2000: salario_bajo
# 2001 - 5000: mid_salary
#> 5001: high_salary

cond = """    CASE 
        WHEN salary > 5000 THEN 'high_salary'
        WHEN salary > 2000 THEN 'mid_salary'
        WHEN salary > 0 THEN 'low_salary'
        ELSE 'invalid_salary'
    END AS salary_level"""

newdf = df.withColumn("salary_level", expr(cond))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



In [0]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



# Funciones Definidas por el Usuario (UDF) en PySpark

En PySpark, una **función definida por el usuario (UDF)** es una función personalizada que se puede aplicar a las columnas de un **DataFrame**. Las UDFs permiten a los usuarios escribir funciones en Python (o en otros lenguajes como Java o Scala) y utilizarlas dentro de las operaciones de transformación de PySpark, extendiendo la funcionalidad de las funciones predefinidas que PySpark ofrece.

## ¿Cuándo usar una UDF?

Usamos UDFs cuando queremos realizar transformaciones que no están disponibles a través de las funciones integradas de PySpark, o cuando necesitamos una lógica de procesamiento más personalizada que no se puede expresar fácilmente con las funciones estándar de PySpark.

## ¿Cómo funcionan las UDFs?

Una **UDF** en PySpark recibe una o más columnas de un DataFrame, las procesa y devuelve un valor. Las UDFs son útiles para realizar operaciones sobre las columnas de un DataFrame cuando no se pueden lograr con las funciones predefinidas de PySpark. 

Las UDFs son especialmente útiles cuando:
- Necesitas aplicar operaciones que no están disponibles en las funciones SQL o de PySpark.
- Tienes una lógica compleja que debe ejecutarse en cada fila de un DataFrame.

## Crear y usar una UDF

### Pasos para crear una UDF en PySpark:

1. **Definir una función Python**: Primero, defines una función normal en Python que realizará la operación que deseas aplicar.
2. **Registrar la UDF**: Luego, registras la función en PySpark usando `udf()` y especificas el tipo de retorno.
3. **Aplicar la UDF al DataFrame**: Después, usas la UDF dentro de operaciones como `select()`, `withColumn()`, etc.

### Sintaxis

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Paso 1: Definir una función Python
def my_function(value):
    return value * 2

# Paso 2: Registrar la función como UDF
my_udf = udf(my_function, IntegerType())

# Paso 3: Aplicar la UDF a una columna del DataFrame
df.withColumn("new_column", my_udf(df["existing_column"]))
```

In [0]:
def detSalary_Level(sal:float)->str:
    """
    Esta función determina el nivel salarial en función de un valor dado de salario.

    Parámetros:
    sal (float): El valor del salario a evaluar.

    Retorna:
    str: Un valor de tipo cadena que indica el nivel del salario:
         - 'low_salary' si el salario es menor a 2000,
         - 'mid_salary' si el salario es menor a 5000 (y mayor o igual a 2000),
         - 'high_salary' si el salario es mayor o igual a 5000.
    """
    level = None

    if sal <2000:
       level = 'low_salary'
    elif sal < 5000:
        level = 'mid_salary'
    else:
        level = 'high_salary'
    return level

In [0]:
#Define la funcion udf
sal_level = udf(detSalary_Level, StringType())
#Aplica la funcion al Dataframe
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Valores Nulos
Spark tiene pocas funciones especificas para trabajar con valores Faltantes

### Métodos de manejo de valores nulos en PySpark

## `.isNull()`

El método `.isNull()` en PySpark se usa para verificar si el valor de una columna es `NULL`. Devuelve una expresión booleana que puede ser utilizada en filtros para seleccionar filas donde el valor de una columna es nulo.

- **Uso típico**: Se utiliza para filtrar filas donde una columna tiene valores nulos.
- **Ejemplo**: `df.filter(df['column_name'].isNull())`

## `.isNotNull()`

El método `.isNotNull()` es el opuesto de `.isNull()`. Se utiliza para verificar si el valor de una columna no es `NULL`. Devuelve una expresión booleana que puede ser utilizada para seleccionar filas donde el valor de una columna es no nulo.

- **Uso típico**: Se utiliza para filtrar filas donde una columna tiene valores no nulos.
- **Ejemplo**: `df.filter(df['column_name'].isNotNull())`

## `.fillna()`

El método `.fillna()` se utiliza para reemplazar valores nulos (`NULL`) con un valor específico. Este método es útil cuando deseas llenar los valores nulos con un valor predeterminado, como 0, una cadena vacía, o cualquier otro valor.

- **Uso típico**: Se utiliza para reemplazar valores nulos en una o varias columnas con un valor predeterminado.
- **Sintaxis**:
  ```python
  df.fillna(value, subset=None)
  ```
- `value`: El valor con el que deseas reemplazar los valores nulos.

- `subset:` (Opcional) Puedes especificar las columnas sobre las cuales deseas aplicar el reemplazo.
- **Ejemplo**: `df.fillna({'column_name': 0, 'another_column': 'unknown'})`

  ## `.dropna()`

El método `.dropna()` en PySpark se utiliza para eliminar filas que contienen valores nulos (`NULL`) en el DataFrame. Este método es útil cuando deseas limpiar los datos y eliminar las filas que no tienen información completa en una o más columnas.

### Sintaxis:
```python
df.dropna(how='any', thresh=None, subset=None)
```
### Parámetros:
- `how:` Especifica cómo se deben eliminar las filas.

  - **any**: Elimina las filas que tienen al menos un valor nulo en cualquier columna. Este es el valor predeterminado.
  - **all**: Elimina las filas que tienen todos los valores nulos en todas las columnas.
- `thresh`: (Opcional) Especifica el número mínimo de valores no nulos requeridos para que la fila se mantenga. Si una fila tiene menos de este número de valores no nulos, se eliminará.

- `subset`: (Opcional) Permite especificar un subconjunto de columnas para evaluar si deben eliminarse las filas. Si no se especifica, se evalúan todas las columnas.


In [0]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|null|null|null|  7500|
|   9| III|null|  4500|
+----+----+----+------+



In [0]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



In [0]:
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|null|null|INVALID|  7500|
|   9| III|INVALID|  4500|
|  10|null|  dept5|  2500|
+----+----+-------+------+



In [0]:
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



In [0]:
# Elimina todas las filas que contienen todos los valores nulos.
newdf = df.dropna(how = "all")
newdf.show()

+----+----+-----+------+
|  id|name| dept|salary|
+----+----+-----+------+
|   1| AAA|dept1|  1000|
|   2| BBB|dept1|  1100|
|   3| CCC|dept1|  3000|
|   4| DDD|dept1|  1500|
|   5| EEE|dept2|  8000|
|   6| FFF|dept2|  7200|
|   7| GGG|dept3|  7100|
|null|null| null|  7500|
|   9| III| null|  4500|
|  10|null|dept5|  2500|
+----+----+-----+------+



In [0]:
newdf = df.dropna(subset = "dept")
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



In [0]:
df.rdd.getNumPartitions()

Out[24]: 8