In [91]:
import findspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import col, to_date, to_timestamp
from pyspark.sql.functions import date_format
from pyspark.sql.functions import datediff, months_between, last_day
from pyspark.sql.functions import date_add, date_sub
from pyspark.sql.functions import year, month, dayofmonth, dayofyear, hour, minute, second
from pyspark.sql.functions import ltrim, rtrim, trim
from pyspark.sql.functions import lpad, rpad
from pyspark.sql.functions import concat_ws, lower, upper, initcap, reverse
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import size, sort_array, array_contains
from pyspark.sql.functions import explode

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType
from pyspark.sql.functions import from_json, to_json

from pyspark.sql.functions import when, lit, coalesce
from pyspark.sql.functions import udf

import pandas as pd
from pyspark.sql.functions import pandas_udf

from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number, rank, dense_rank
from pyspark.sql.functions import min, max, avg

findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()

sc = spark.sparkContext

## 1. Funciones de fecha y hora

### 1-1. Convertir de string a fecha y darle formato
to_date, to_timestamp, date_format

In [3]:
df_convertir = spark.read.parquet('./data/convertir/')

In [4]:
df_convertir.printSchema()

root
 |-- date: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- ts_str: string (nullable = true)



In [5]:
df_convertir.show(truncate=False)

+----------+-----------------------+----------+----------------+
|date      |timestamp              |date_str  |ts_str          |
+----------+-----------------------+----------+----------------+
|2021-01-01|2021-01-01 20:10:50.723|01-01-2021|18-08-2021 46:58|
+----------+-----------------------+----------+----------------+



In [6]:
# Vamos a convertir los dato a formato date y timestamp

df_convertir1 = df_convertir.select(
    to_date(col('date')).alias('date1'),
    to_timestamp(col('timestamp')).alias('ts1'),
    to_date(col('date_str'), 'dd-MM-yyyy').alias('date2'),
    to_timestamp(col('ts_str'), 'dd-MM-yyyy mm:ss').alias('ts2')
)

In [7]:
df_convertir1.show(truncate=False)

+----------+-----------------------+----------+-------------------+
|date1     |ts1                    |date2     |ts2                |
+----------+-----------------------+----------+-------------------+
|2021-01-01|2021-01-01 20:10:50.723|2021-01-01|2021-08-18 00:46:58|
+----------+-----------------------+----------+-------------------+



In [8]:
df_convertir1.printSchema()

root
 |-- date1: date (nullable = true)
 |-- ts1: timestamp (nullable = true)
 |-- date2: date (nullable = true)
 |-- ts2: timestamp (nullable = true)



In [11]:
# Ahora vamos cambiar el formato de una fecha

df_convertir1.select(
    date_format(col('date1'), 'dd-MM-yyyy')
).show()

+------------------------------+
|date_format(date1, dd-MM-yyyy)|
+------------------------------+
|                    01-01-2021|
+------------------------------+



### 1-2. Cálculos de fecha y hora
datediff, months_between, last_day

In [13]:
df_calculo = spark.read.parquet('./data/calculo')

In [14]:
df_calculo.show()

+------+-------------+------------+-------------------+
|nombre|fecha_ingreso|fecha_salida|       baja_sistema|
+------+-------------+------------+-------------------+
|  Jose|   2021-01-01|  2021-11-14|2021-10-14 15:35:59|
|Mayara|   2021-02-06|  2021-11-25|2021-11-25 10:35:55|
+------+-------------+------------+-------------------+



In [15]:
# Vamos a calcular la diferencia entre la fecha de salida y la fecha de ingreso

df_calculo.select(
    col('nombre'),
    datediff(col('fecha_salida'), col('fecha_ingreso')).alias('dias'),
    months_between(col('fecha_salida'), col('fecha_ingreso')).alias('meses'),
    last_day(col('fecha_salida')).alias('ultimo_dia_mes')
).show()

+------+----+-----------+--------------+
|nombre|dias|      meses|ultimo_dia_mes|
+------+----+-----------+--------------+
|  Jose| 317|10.41935484|    2021-11-30|
|Mayara| 292| 9.61290323|    2021-11-30|
+------+----+-----------+--------------+



In [18]:
# Ahora vamos a sumar y restar fechas, le sumaremos a fecha_ingreso 14 días y le restaremos 1

df_calculo.select(
    col('nombre'),
    col('fecha_ingreso'),
    date_add(col('fecha_ingreso'), 14).alias('mas_14_dias'),
    date_sub(col('fecha_ingreso'), 1).alias('menos_1_dia')
).show()

+------+-------------+-----------+-----------+
|nombre|fecha_ingreso|mas_14_dias|menos_1_dia|
+------+-------------+-----------+-----------+
|  Jose|   2021-01-01| 2021-01-15| 2020-12-31|
|Mayara|   2021-02-06| 2021-02-20| 2021-02-05|
+------+-------------+-----------+-----------+



### 1-3. Extraer valores específicos de una columna date
year, month, dayofmonth, dayofyear, hour, minute, second

In [20]:
df_calculo.select(
    col('baja_sistema'),
    year(col('baja_sistema')),
    month(col('baja_sistema')),
    dayofmonth(col('baja_sistema')),
    dayofyear(col('baja_sistema')),
    hour(col('baja_sistema')),
    minute(col('baja_sistema')),
    second(col('baja_sistema'))
).show()

+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+--------------------+--------------------+
|       baja_sistema|year(baja_sistema)|month(baja_sistema)|dayofmonth(baja_sistema)|dayofyear(baja_sistema)|hour(baja_sistema)|minute(baja_sistema)|second(baja_sistema)|
+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+--------------------+--------------------+
|2021-10-14 15:35:59|              2021|                 10|                      14|                    287|                15|                  35|                  59|
|2021-11-25 10:35:55|              2021|                 11|                      25|                    329|                10|                  35|                  55|
+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+-----------------

## 2. Funciones para trabajar con strings

### 2-1. Transformación de un string

Para eliminar espacios sobrantes:

* ltrim, rtrim, trim

In [21]:
df_strings = spark.read.parquet('./data/strings')

In [22]:
df_strings.show()

+-------+
| nombre|
+-------+
| Spark |
+-------+



In [25]:
# Vamos a eliminar los espacios sobrantes en la palabra Spark

df_strings.select(
    ltrim('nombre').alias('ltrim'), # Elimina espacios a la izquierda
    rtrim('nombre').alias('rtrim'), # Elimina espacios a la derecha
    trim('nombre').alias('trim') # Elimina espacios tanto a la izquierda como a la derecha
).show()

+------+------+-----+
| ltrim| rtrim| trim|
+------+------+-----+
|Spark | Spark|Spark|
+------+------+-----+



Para rellenar un string con un caracter fijo:

* lpad, rpad

In [28]:
df_strings.select(
    trim(col('nombre')).alias('trim') # Primero eliminamos los espacios en blanco
).select(
    lpad(col('trim'), 8, '-').alias('lpad'), # Le decimos el número total de caracteres que queremos y el caracter que queremos añadir a la izquierda
    rpad(col('trim'), 8, '=').alias('rpad') # Le decimos el número total de caracteres que queremos y el caracter que queremos añadir a la derecha
).show()

+--------+--------+
|    lpad|    rpad|
+--------+--------+
|---Spark|Spark===|
+--------+--------+



Para concatenar, cambiar entre mayúsculas y minúsculas y revertir un string:

* concat_ws, lower, upper, initcap, reverse

In [29]:
df_strings1 = spark.createDataFrame([('Spark', 'es', 'maravilloso')], ['sujeto', 'verbo', 'adjetivo'])

df_strings1.show()

+------+-----+-----------+
|sujeto|verbo|   adjetivo|
+------+-----+-----------+
| Spark|   es|maravilloso|
+------+-----+-----------+



In [31]:
df_strings1.select(
    concat_ws(' ', col('sujeto'), col('verbo'), col('adjetivo')).alias('frase') # Concatenamos las tres columnas con un espacio
).select(
    col('frase'), # Al resultado de la instrucción anterior...
    lower(col('frase')).alias('minuscula'), # ...lo pasamos todo a minúsculas
    upper(col('frase')).alias('mayuscula'), # ...lo pasamos todo a mayúsculas
    initcap(col('frase')).alias('initcap'), # ...ponemos en mayúscula la inicial de cada palabra
    reverse(col('frase')).alias('reversa') # ...revertimos el orden de los caracteres del string
).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|               frase|           minuscula|           mayuscula|             initcap|             reversa|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Spark es maravilloso|spark es maravilloso|SPARK ES MARAVILLOSO|Spark Es Maravilloso|osollivaram se krapS|
+--------------------+--------------------+--------------------+--------------------+--------------------+



### 2-2. Aplicación de expresiones regulares

Nos permite reemplazar alguna parte de un string o extraer ciertas partes de un string según un patrón:

* regexp_replace

In [34]:
df_strings2 = spark.createDataFrame([(' voy a casa por mis llaves',)], ['frase'])

df_strings2.show(truncate=False)

+--------------------------+
|frase                     |
+--------------------------+
| voy a casa por mis llaves|
+--------------------------+



In [35]:
# Reemplazamos un patrón por lo que nosotros queramos

df_strings2.select(
    regexp_replace(col('frase'), 'voy|por', 'ir').alias('nueva_frase') # Sustituímos las palabras 'voy' y 'por' por 'ir'
).show(truncate=False)

+------------------------+
|nueva_frase             |
+------------------------+
| ir a casa ir mis llaves|
+------------------------+



## 3. Funciones para trabajar con colecciones

Están diseñadas para trabajar con tipos de datos complejos como array, map y estructuras.

### 3-1. Trabajar con arrays:

* size, sort_array, array_contains, explode

In [37]:
df_colecc_parquet = spark.read.parquet('./data/colecciones/parquet/')
df_colecc_parquet.show(truncate=False)

+-----+--------------------------------------------+
|dia  |tareas                                      |
+-----+--------------------------------------------+
|lunes|[hacer la tarea, buscar agua, lavar el auto]|
+-----+--------------------------------------------+



In [38]:
df_colecc_parquet.printSchema()

root
 |-- dia: string (nullable = true)
 |-- tareas: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [40]:
df_colecc_parquet.select(
    size(col('tareas')).alias('tamaño'), # Nos da el tamaño de la colección
    sort_array(col('tareas')).alias('arreglo_ordenado'), # Nos ordena la collection ya sea por orden alfabético o numérico
    array_contains(col('tareas'), 'buscar agua').alias('buscar_agua') # Nos permite buscar valores específicos dentro de la collection
).show(truncate=False)

+------+--------------------------------------------+-----------+
|tamaño|arreglo_ordenado                            |buscar_agua|
+------+--------------------------------------------+-----------+
|3     |[buscar agua, hacer la tarea, lavar el auto]|true       |
+------+--------------------------------------------+-----------+



In [42]:
df_colecc_parquet.select(
    col('dia'),
    explode(col('tareas')).alias('tareas') # Transforma cada elemento de la collection en una fila del DF
).show()

+-----+--------------+
|  dia|        tareas|
+-----+--------------+
|lunes|hacer la tarea|
|lunes|   buscar agua|
|lunes| lavar el auto|
+-----+--------------+



### 3-2. Trabajar con json

Para convertir una cadena JSON en un tipo de datos de estructura de Spark, necesitamos describir su estructura, valga la redundancia.\
Para ello vamos a necesitar crear un esquema JSON que proporcionaremos luego a la función from_json para que determine cuál es la estructura de nuestra columna.

In [44]:
json_df_str = spark.read.parquet('./data/colecciones/JSON')
json_df_str.show(truncate=False)

+---------------------------------------------------------------------------+
|tareas_str                                                                 |
+---------------------------------------------------------------------------+
|{"dia": "lunes","tareas": ["hacer la tarea","buscar agua","lavar el auto"]}|
+---------------------------------------------------------------------------+



In [45]:
json_df_str.printSchema()

root
 |-- tareas_str: string (nullable = true)



In [46]:
# Creamos el esquema JSON con dos columnas cuyos nombres coinciden con las keys de la estructura JSON

schema_json = StructType(
    [
     StructField('dia', StringType(), True),
     StructField('tareas', ArrayType(StringType()), True)
    ]
)

In [47]:
# A from_json le pasamos la columna que contiene el string en forma de json y el esquema que va a necesitar para discernir 
# los tipos de datos que tiene dentro esa columna.

json_df = json_df_str.select(
    from_json(col('tareas_str'), schema_json).alias('por_hacer')
)

In [48]:
json_df.printSchema()

root
 |-- por_hacer: struct (nullable = true)
 |    |-- dia: string (nullable = true)
 |    |-- tareas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [49]:
# Ahora vamos a extraer información del json_df que hemos creado

json_df.select(
    col('por_hacer').getItem('dia'), # Obtenemos el día
    col('por_hacer').getItem('tareas'), # Obtenemos la lista de tareas
    col('por_hacer').getItem('tareas').getItem(0).alias('primer_tarea') # Obtenemos el primer elemento de la lista tareas
).show(truncate=False)

+-------------+--------------------------------------------+--------------+
|por_hacer.dia|por_hacer.tareas                            |primer_tarea  |
+-------------+--------------------------------------------+--------------+
|lunes        |[hacer la tarea, buscar agua, lavar el auto]|hacer la tarea|
+-------------+--------------------------------------------+--------------+



Finalmente vamos a convertir los datos del DF en un string JSON

In [50]:
# Invertimos las operaciones anteriores volviendo al string del que partimos

json_df.select(
    to_json(col('por_hacer'))
).show(truncate=False)

+-------------------------------------------------------------------------+
|to_json(por_hacer)                                                       |
+-------------------------------------------------------------------------+
|{"dia":"lunes","tareas":["hacer la tarea","buscar agua","lavar el auto"]}|
+-------------------------------------------------------------------------+



## 4. Funciones when, coalesce y lit

In [52]:
data = spark.read.parquet('./data/c8l4/part-00000-a9b42845-6edf-4329-996e-2528aa78bb4a-c000.snappy.parquet')
data.show()

+------+----+
|nombre|pago|
+------+----+
|  Jose|   1|
| Julia|   2|
| Katia|   1|
|  null|   3|
|  Raul|   3|
+------+----+



Si existe la necesidad de evaluar un valor frente a una lista de condiciones y devolver otro valor, entonces una solución típica es utilizar una declaración switch, que está disponible en la mayoría de los lenguajes de programación de alto nivel.

Cuando sea necesario hacer esto con el valor de una columna en un DF, entonces podremos usar la función **when()** para este fin, combinado con **otherwise()** que indica el caso por defecto, a aplicar si no se cumple la condición de when.

In [53]:
# Vamos a sustituir los valores numéricos por diferentes strings usando when / otherwise

data.select(
    col('nombre'),
    when(col('pago') == 1, 'pagado').when(col('pago') == 2, 'sin pagar').otherwise('sin iniciar').alias('pago')
).show()

+------+-----------+
|nombre|       pago|
+------+-----------+
|  Jose|     pagado|
| Julia|  sin pagar|
| Katia|     pagado|
|  null|sin iniciar|
|  Raul|sin iniciar|
+------+-----------+



Una de las formas de manejar nulos es convertirlos a valores que representen nulos en nuestra lógica de procesamiento de datos.\
La función **coalesce()** toma uno o más valores de columna y devuelve el primero que no es nulo. Cada argumento en la función **coalesce()** debe ser de tipo columna, por lo que si se desea completar algún valor que no aparece, debemos utilizar la función lit() o literal.

In [54]:
data.select(
    coalesce(col('nombre'), lit('sin nombre')).alias('nombre') # Sustituimos el null por el string 'sin nombre'
).show()

+----------+
|    nombre|
+----------+
|      Jose|
|     Julia|
|     Katia|
|sin nombre|
|      Raul|
+----------+



## 5. Funciones definidas por el usuario (UDF)

Tenemos cuatro opciones para crear las UDFs:

### 5-1. Registrar el UDF con la función register()

A **register()** le pasamos el nombre con el que vamos a llamar a la función, el nombre de la función que queremos registar y por último le proporcionamos el tipo de dato que devolverá la función.

In [63]:
# Creamos una función que retorne el cubo de un número

def f_cubo(n):
    return n * n * n

In [60]:
# Registramos la función

spark.udf.register('cubo', f_cubo, LongType())

<function __main__.f_cubo(n)>

In [61]:
# Vamos a crear una vista temporal de un range, con createOrReplaceTempView(<nombre de la vista>)

spark.range(1,10).createOrReplaceTempView('df_temp')

In [62]:
# Le pasamos una consulta SQL. Hacemos un select de la columna id y usamos la función cubo, para que obtenga el cubo de la columna id, 
# renombrado como 'cubo'; finalmente le indicamos que tiene que hacer la consulta desde la vista temporal que acabamos de crear

spark.sql("SELECT id, cubo(id) AS cubo FROM df_temp").show()

+---+----+
| id|cubo|
+---+----+
|  1|   1|
|  2|   8|
|  3|  27|
|  4|  64|
|  5| 125|
|  6| 216|
|  7| 343|
|  8| 512|
|  9| 729|
+---+----+



                                                                                

### 5-2. Crear el UDF con la funcion udf()

A la función udf() le pasamos como primer parámetro una función y como segundo parámetro le decimos el tipo de dato que va a retornar la UDF

In [64]:
def bienvenida(nombre):
    return ('Hola {}'.format(nombre))

In [65]:
# A la función udf() le pasamos como primer parámetro una función, en este caso una lambda a la que le pasamos la función que creamos nosotros,
# y como segundo parámetro le decimos el tipo de dato que va a retornar la UDF

bienvenida_udf = udf(lambda x: bienvenida(x), StringType())

In [66]:
# creamos un DF

df_nombre = spark.createDataFrame([('Jose',), ('Julia',)], ['nombre'])
df_nombre.show()

+------+
|nombre|
+------+
|  Jose|
| Julia|
+------+



In [67]:
# Probamos el UDF sobre el DF que acabamos de crear

df_nombre.select(
    col('nombre'),
    bienvenida_udf(col('nombre')).alias('bie_nombre')
).show()

+------+----------+
|nombre|bie_nombre|
+------+----------+
|  Jose| Hola Jose|
| Julia|Hola Julia|
+------+----------+



### 5-3. Notación @udf

A @udf le pasamos el tipo de dato que queremos que retorne la UDF e inmediatamente seguido definimos la función

In [68]:
# Creamos una UDF llamado mayuscula que pasa un string a mayúsculas

@udf(returnType=StringType())
def mayuscula(s):
    return s.upper()

In [69]:
# Usamos el UDF mayúscula

df_nombre.select(
    col('nombre'),
    mayuscula(col('nombre')).alias('may_nombre')
).show()


+------+----------+
|nombre|may_nombre|
+------+----------+
|  Jose|      JOSE|
| Julia|     JULIA|
+------+----------+



### 5-4. Pandas UDF

Antes, u no de los problemas predominantes con el uso de las UDFs en pyspark era que tenían un rendimiento más lento que las UDFs en el lenguaje scala Esto se debía a que las UDFs de pyspark requerían el movimiento de datos entre la máquina virtual de Java y Python, lo cual era bastante costoso.\
Para resolver este problema se introdujeron las UDFs de pandas o UDFs vectorizadas.

Para crear UDFs con pandas_udf, primero tenemos que crear una función indicando que recibe y devuelve datos tipo pandas.Series, y después le pasamos la función recién creada a pandas_udf junto con el tipo de dato que queremos que nos devuelva

In [70]:
# Creamos una función que devuelva el cubo de un número, pero como tipo de dato recibe y devuelve pandas.Series

def cubo_pandas(a: pd.Series) -> pd.Series: # Le decimos que recibe y devuelve datos tipo pandas.Series
    return a * a * a

In [72]:
# Creamos el pandas_udf pasándole la función que acabamos de crear y el tipo de dato que queremos que devuelva

cubo_udf = pandas_udf(cubo_pandas, returnType=LongType())

In [73]:
# Creamos una serie de pandas para probar la pandas UDF

x = pd.Series([1, 2, 3])

In [74]:
# Probamos nuestro función normal

print(cubo_pandas(x))

0     1
1     8
2    27
dtype: int64


In [77]:
# Creamos un DF para probar la UDF

df_probar_pandasUDF = spark.range(5)

In [79]:
df_probar_pandasUDF.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [78]:
df_probar_pandasUDF.select(
    col('id'),
    cubo_udf(col('id')).alias('cubo_pandas')
).show()

+---+-----------+
| id|cubo_pandas|
+---+-----------+
|  0|          0|
|  1|          1|
|  2|          8|
|  3|         27|
|  4|         64|
+---+-----------+



## 6 Funciones de ventana

Las funciones de ventana en Spark operan en un grupo de filas y devuelven un valor único para cada fila de entrada.\
Con estas funciones podemos crear grupos y luego operar sobre estos grupos.\ 
Para realizar una operación en un grupo primero necesitamos particionar los datos utilizando la clase Window y dentro de la clase window la función partition_by.

In [80]:
# Supongamos que en este set de datos nos solicitan obtener los trabajadores con la evaluación más alta por departamento.
# Si se fijan, existen varios trabajadores por cada uno de los departamentos.

df_ventana = spark.read.parquet('./data/funciones_ventana.parquet')
df_ventana.show()

+-------+----+------------+----------+
| nombre|edad|departamento|evaluacion|
+-------+----+------------+----------+
| Lazaro|  45|      letras|        98|
|   Raul|  24|  matemática|        76|
|  Maria|  34|  matemática|        27|
|   Jose|  30|     química|        78|
| Susana|  51|     química|        98|
|   Juan|  44|      letras|        89|
|  Julia|  55|      letras|        92|
|  Kadir|  38|arquitectura|        39|
| Lilian|  23|arquitectura|        94|
|   Rosa|  26|      letras|        91|
|   Aian|  50|  matemática|        73|
|Yaneisy|  29|      letras|        89|
|Enrique|  40|     química|        92|
|    Jon|  25|arquitectura|        78|
|  Luisa|  39|arquitectura|        94|
+-------+----+------------+----------+



In [82]:
# Además de la window partition, hacemos un orderBy para poder después utilizar las funciones row_number, rank y dense_rank
# Perticionamos por departamento para que nos cree los grupos que necesitamos

windowSpec = Window.partitionBy('departamento').orderBy(desc('evaluacion'))

### 6-1. row_number

Se usa para dar el número de fila secuencial, comenzando desde 1 hasta el resultado de cada partición de ventana. Se aplica sobre una especificación de ventana que hayamos creado previamente y lo hacemos con la función over().

In [85]:
# Ahora numeraremos las filas por departamento, como le especificamos en la window partition, atendiendo a la evaluación de mayor a menor

df_ventana.withColumn('row_number', row_number().over(windowSpec)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|row_number|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         2|
|    Jon|  25|arquitectura|        78|         3|
|  Kadir|  38|arquitectura|        39|         4|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Rosa|  26|      letras|        91|         3|
|   Juan|  44|      letras|        89|         4|
|Yaneisy|  29|      letras|        89|         5|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
|  Maria|  34|  matemática|        27|         3|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
|   Jose|  30|     química|        78|         3|
+-------+----+------------+----------+----------+



In [86]:
# Y ahora aplicamos a lo anterior un filter para obtener sólo los dos trabajadores con mejor evaluación de cada departamento

df_ventana.withColumn('row_number', row_number().over(windowSpec)).filter(col('row_number').isin(1,2)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|row_number|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         2|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
+-------+----+------------+----------+----------+



### 6-2. rank 

Se usa para proporcionar un rango al resultado dentro de una partición de ventana. La diferencia con **row_number** es que esta función deja huecos en el rango cuando hay empates.

In [88]:
# rank da el mismo número a los empates, pero lo tiene en cuenta y el siguiente valor no empatado no lo numera consecutivo, 
# por ejemplo, tenemos dos 1 y el siguiente es un 3

df_ventana.withColumn('rank', rank().over(windowSpec)).show()

+-------+----+------------+----------+----+
| nombre|edad|departamento|evaluacion|rank|
+-------+----+------------+----------+----+
| Lilian|  23|arquitectura|        94|   1|
|  Luisa|  39|arquitectura|        94|   1|
|    Jon|  25|arquitectura|        78|   3|
|  Kadir|  38|arquitectura|        39|   4|
| Lazaro|  45|      letras|        98|   1|
|  Julia|  55|      letras|        92|   2|
|   Rosa|  26|      letras|        91|   3|
|   Juan|  44|      letras|        89|   4|
|Yaneisy|  29|      letras|        89|   4|
|   Raul|  24|  matemática|        76|   1|
|   Aian|  50|  matemática|        73|   2|
|  Maria|  34|  matemática|        27|   3|
| Susana|  51|     química|        98|   1|
|Enrique|  40|     química|        92|   2|
|   Jose|  30|     química|        78|   3|
+-------+----+------------+----------+----+



### 6-3. dense_rank

Hace lo mismo que **rank**, pero sin dejar huecos cuando hay empate 

In [89]:
# dense_rank da el mismo número a los empates y el siguiente lo numera consecutivo. Aquí vemos que da dos 1 y el siguiente valor es un 2

df_ventana.withColumn('dense_rank', dense_rank().over(windowSpec)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|dense_rank|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         1|
|    Jon|  25|arquitectura|        78|         2|
|  Kadir|  38|arquitectura|        39|         3|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Rosa|  26|      letras|        91|         3|
|   Juan|  44|      letras|        89|         4|
|Yaneisy|  29|      letras|        89|         4|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
|  Maria|  34|  matemática|        27|         3|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
|   Jose|  30|     química|        78|         3|
+-------+----+------------+----------+----------+



### 6-4. Agregaciones con especificaciones de ventana

Cuando trabajamos con funciones de agregación no es necesario utilizar la cláusula orderBy cuando creamos la especificación de ventana.

In [90]:
# Creamos una nueva especificación de ventana sin el orderBy

windowSpecAgg = Window.partitionBy('departamento')

In [92]:
# Vamos a combinar las dos especificaciones de ventana que hemos creado

(df_ventana.withColumn('min', min(col('evaluacion')).over(windowSpecAgg)) # Calcula el min sobre cada partición creada con windowSpecAgg
.withColumn('max', max(col('evaluacion')).over(windowSpecAgg)) # Calcula el max sobre cada partición creada con windowSpecAgg
.withColumn('avg', avg(col('evaluacion')).over(windowSpecAgg)) # Calcula el avg sobre cada partición creada con windowSpecAgg
.withColumn('row_number', row_number().over(windowSpec)) # Para aplicar row_number tenemos que usar la primera windowSpec, que tenía orderBy
 ).show()

+-------+----+------------+----------+---+---+------------------+----------+
| nombre|edad|departamento|evaluacion|min|max|               avg|row_number|
+-------+----+------------+----------+---+---+------------------+----------+
| Lilian|  23|arquitectura|        94| 39| 94|             76.25|         1|
|  Luisa|  39|arquitectura|        94| 39| 94|             76.25|         2|
|    Jon|  25|arquitectura|        78| 39| 94|             76.25|         3|
|  Kadir|  38|arquitectura|        39| 39| 94|             76.25|         4|
| Lazaro|  45|      letras|        98| 89| 98|              91.8|         1|
|  Julia|  55|      letras|        92| 89| 98|              91.8|         2|
|   Rosa|  26|      letras|        91| 89| 98|              91.8|         3|
|   Juan|  44|      letras|        89| 89| 98|              91.8|         4|
|Yaneisy|  29|      letras|        89| 89| 98|              91.8|         5|
|   Raul|  24|  matemática|        76| 27| 76|58.666666666666664|         1|

## 7. Catalyst Optimizer

>"La forma más fácil de escribir aplicaciones de procesamiento de datos eficiente es no preocuparse por ello y optimizar automáticamente sus aplicaciones de procesamiento de datos."

Esta es la promesa de SparkCatalyst, que es un optimizador de consultas y es el segundo componente principal del módulo sparkSQL.\
Este desempeña un papel importante para garantizar que la lógica de procesamiento de datos escrita en las APIs de Data Frame o SQL se ejecute de manera eficiente y rápida.\
Fue diseñado para minimizar los tiempos de respuesta de las consultas de un extremo a otro, así como para que sea extensible, de modo que los usuarios de Spark puedan inyectar código de usuario en el optimizador para realizar una optimización personalizada.

En alto nivel SparkCatalyst traduce la lógica de procesamiento de datos escrita por el usuario en un plan lógico, luego la optimiza utilizando heurística y finalmente convierte el plan lógico en un plan físico. El paso final es generar código basado en el plan físico.

* **Plan Lógico**

    1. El primer paso en el proceso de optimización de Catalyst es crear un plan lógico a partir de un objeto DF o del árbol de sintaxis abstracta de la consulta SQL analizada. El plan lógico es una representación interna de la lógica de procesamiento de datos del usuario en forma de árbol de operaciones y expresión.

    2. A continuación, Catalyst analiza el plan lógico para resolver las referencias y asegurarse de que sean válidas.

    3. Luego aplica al plan lógico un conjunto de optimizaciones basadas en reglas y en costos de procesamiento. Ambos tipos de optimización siguen el principio de podar los datos innecesarios lo antes posible y minimizar el costo por operador. Por ejemplo, durante esta fase de optimización, Catalyst puede decidir mover la condición del filtro antes de realizar una unión.

* **Plan Físico**

    1. Una vez que se optimiza el plan lógico Catalyst generará uno o más planes físicos utilizando los operadores físicos que coinciden con el motor de ejecución Spark.

    2. Además de las optimizaciones realizadas en la fase del plan lógico, la fase del plan físico realiza sus propias optimizaciones basadas en reglas. Estas optimizaciones siguen el mismo principio de poda de datos descrito anteriormente en el punto 3 del plan lógico.

    3. El último paso que realiza Catalyst es generar el código de bytes de Java del plan físico más económico.

In [98]:
df_vuelos = spark.read.parquet('./data/flights/vuelos.parquet')

In [94]:
df_vuelos.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [95]:
df_vuelos.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

23/07/26 18:36:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [100]:
# Vamos a realizar una serie de transformaciones al DF para ver después en qué orden las ha ejecutado Catalyst

df_vuelos_nuevo = (df_vuelos.filter(col('MONTH').isin(6,7,8)) # Un filtrado para algunos de los meses
            .withColumn('dis_tiempo_aire', col('DISTANCE') / col('AIR_TIME')) # Agregamos una nueva columna
).select(
    col('AIRLINE'), # Nos quedamos con un par de columnas
    col('dis_tiempo_aire')
).where(col('AIRLINE').isin('AA', 'DL', 'AS')) # Y finalmente seleccionamos con where algunas aerolíneas

Para poder visualizar tanto el **plan lógico** como el **plan físico**, utilizaremos la función explain()

In [101]:
# Vamos a visualizar los planes lógico y físico con la función explain() con el parámetro True para que nos muestre ambos

df_vuelos_nuevo.explain(True)

# En el plan lógico optimizado se combinan el filtro del mes con el de la aerolínea: +- Filter (MONTH#943 IN (6,7,8) AND AIRLINE#946 IN (AA,DL,AS))
# En el plan física se podan los datos innecesarios, quedándose sólo con la información necesaria para la consulta

== Parsed Logical Plan ==
'Filter 'AIRLINE IN (AA,DL,AS)
+- Project [AIRLINE#946, dis_tiempo_aire#1005]
   +- Project [YEAR#942, MONTH#943, DAY#944, DAY_OF_WEEK#945, AIRLINE#946, FLIGHT_NUMBER#947, TAIL_NUMBER#948, ORIGIN_AIRPORT#949, DESTINATION_AIRPORT#950, SCHEDULED_DEPARTURE#951, DEPARTURE_TIME#952, DEPARTURE_DELAY#953, TAXI_OUT#954, WHEELS_OFF#955, SCHEDULED_TIME#956, ELAPSED_TIME#957, AIR_TIME#958, DISTANCE#959, WHEELS_ON#960, TAXI_IN#961, SCHEDULED_ARRIVAL#962, ARRIVAL_TIME#963, ARRIVAL_DELAY#964, DIVERTED#965, ... 8 more fields]
      +- Filter MONTH#943 IN (6,7,8)
         +- Relation [YEAR#942,MONTH#943,DAY#944,DAY_OF_WEEK#945,AIRLINE#946,FLIGHT_NUMBER#947,TAIL_NUMBER#948,ORIGIN_AIRPORT#949,DESTINATION_AIRPORT#950,SCHEDULED_DEPARTURE#951,DEPARTURE_TIME#952,DEPARTURE_DELAY#953,TAXI_OUT#954,WHEELS_OFF#955,SCHEDULED_TIME#956,ELAPSED_TIME#957,AIR_TIME#958,DISTANCE#959,WHEELS_ON#960,TAXI_IN#961,SCHEDULED_ARRIVAL#962,ARRIVAL_TIME#963,ARRIVAL_DELAY#964,DIVERTED#965,... 7 more fields