In [2]:
import pyspark
from pyspark.sql import SparkSession

# Creamos SparkSession
spark = SparkSession \
        .builder \
        .appName('Window_functions') \
        .master('local[*]') \
        .getOrCreate()

In [2]:
# See info about SparkSession and link to Spark UI
spark

Una función ventana es una función agregada aplicada a una partición o subconjunto del resultado de una consulta, devuelve un valor por cada fila del resultado de una consulta.

Para trabajar con ventanas se pueden utilizar las funciones agregadas normales y específicas.

Ejemplo Sparkbyexamples.
https://sparkbyexamples.com/pyspark/pyspark-window-functions/

Las funciones de ventana se utilizan para calcular resultados como rankings o numero de filas en una ventana (rango de filas de entrada). Son utiles cuando necesitamos realizar operaciones de agregacion en un marco de ventana especifico en las columnas de un DF.


Las funciones de la ventana PySpark operan en un grupo de filas (como marco, partición) y devuelven un valor único para cada fila de entrada. PySpark SQL admite tres tipos de funciones de ventana:
- funciones de rankings
- funciones analiticas
- funciones de agregacion

Para el caso de las funciones de rankings (rank,dense_rank) y analytics(lag,lead,cum_dist) será necesario usar la clausula order pasanlode la ventana creada. 

In [3]:
## Create pyspark df ##

schema = """ employee_name STRING, 
             department STRING, 
             salary INT  """

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

df = spark.createDataFrame(data = simpleData, schema = schema)

df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



# 1. PySpark window Rankings functions
Primero creamos nuestra ventana con la el metodo window, pasandole partittionby con la columna a particionar y orderby con la columna que ordenará la partición.


**row_number()** se usa para dar el número de fila o registro comenzando desde 1 hasta el resultado de cada partición de ventana.

In [19]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

# Create window (parttition by department and order by salary asc)
windowSpec  = Window.partitionBy("department").orderBy("salary")

# Create column row_number using window with row_number() function
df2 = df.withColumn("row_number", row_number().over(windowSpec))
df2.show(truncate=False)


+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



**rank()** obtiene un ranking/clasificacion de filas dentro de una particion de ventana ordenada. A los empates se les asigna el mismo rango, y se omiten los siguientes rangos.

ej si tiene 3 elementos en el rango 2, el siguiente rango en la lista sería el 5.

Si nos fijamos en el ejemplo hay 3 rankigs, uno para cada departamento.

Nuestra ventana esta particionada por departamento y ordenada por salario en orden ascendente.

rank() en caso de empates, por ejemplo las 2 primeras filas de salary, les da a las dos el mismo rango y omite el siguiente, saltandose el 2, lo mimo ocurre con las filas de Robert y Saif, empatan por salario de 4100 y se salta el 4.

In [20]:
from pyspark.sql.functions import rank

df.withColumn("rank", rank().over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+----------+------+----+



**dense_rank()** obtiene un ranking/clasificacion de filas dentro de una particion de ventana ordenada.  La diferencia con rank es los rankings son consecutivos, no omitiendo ningun rango si hay empates.

En este caso vemos que en el empate entre los dos James, les da a los 2 el primer rango y pasa al 2, ocurriendo lo mismo con Robert y Saif dandole el 2.

In [21]:
from pyspark.sql.functions import dense_rank

df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+



**percent_rank()** devuelve el percentil de las filas o registros dentro de una particion de ventana ordenada.

En este caso podemos ver que asigna el mismo percentil en caso de empate y sin saltarse los rangos de percentil consecutivos.

El percentil es una medida de posición usada en estadística que indica, una vez ordenados los datos de menor a mayor, el valor de la variable por debajo del cual se encuentra un porcentaje dado de observaciones en un grupo. Por ejemplo, el percentil 20.º es el valor bajo el cual se encuentran el 20 por ciento de las observaciones. Él y el 80% restante son mayores.

In [23]:
from pyspark.sql.functions import percent_rank

df.withColumn("percent_rank", percent_rank().over(windowSpec)).show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
+-------------+----------+------+------------+



**ntile()** devielve el ranking en base al argumento ntile dado. 

En el ejemplo le pasamos el 2 como argumento a ntile para que devuelva el ranking entre 2 valores (1 y 2)

In [27]:
from pyspark.sql.functions import ntile

df.withColumn("ntile", ntile(2).over(windowSpec)).show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
+-------------+----------+------+-----+



# 2. PySpark Window Analytic functions

**cume_dist()** devuelve la una distribucion acumulada de valores en una particion de ventana ordenada.

In [32]:
from pyspark.sql.functions import cume_dist

df.withColumn("cume_dist", cume_dist().over(windowSpec)).show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
+-------------+----------+------+------------------+



**lag(col, offset)** utiliza una columna numerica para, en base al offset (valor numerico) desplazar los valores de la fila hacia atras.

En el ejemplo que, àra el departamento Sales, desplazamos los valores hacia atras de la columna salary 1 vez, por lo que Michael pasa a tener el salario que antes tenia Saif (de 4600 a 4100), Saif el salario que antes tenia Robert (de 4100 a 3000), y asi sucesivamente hasta James, que como es el primero no se puede desplazar y se le da un valor nulo.

In [34]:
from pyspark.sql.functions import lag  

df.withColumn("lag",lag("salary",1).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|3000|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|4100|
|      Michael|     Sales|  4600|4100|
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|3000|
|          Jen|   Finance|  3900|3300|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|2000|
+-------------+----------+------+----+



**lead(col, offset)** utiliza una columna numerica para, en base al offset (valor numerico) desplazar los valores de la fila hacia adelante.

En el ejemplo podemos ver que, para el departamento Sales, desplazamos los registros de la columna salary hacia adelante 1 vez. Por lo que, James pasaria a tener el salario de James (de 3000 a 3000), James pasaria a tener el salario de rober (de 3000 a 4100) y asi sucesivamente hasta Michael, que como no puede avanzar se le da un valor nulo.

In [36]:
from pyspark.sql.functions import lead    

df.withColumn("lead",lead("salary",1).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        James|     Sales|  3000|3000|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4100|
|         Saif|     Sales|  4100|4600|
|      Michael|     Sales|  4600|null|
|        Maria|   Finance|  3000|3300|
|        Scott|   Finance|  3300|3900|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|3000|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+



# 4. PySpark Window Analytic functions

De la misma manera, se puede realizar **operaciones de agregacion como maximos, minimos, totales, medias, etc por ventana** (en el caso del ejemplo recordemos que la ventana esta particionada por departamento y ordenada por salario en orden ascendente). 

Se hace de la misma forma, le pasamos columna a la funcion de agregacion y le añadimos la clausula over con la ventana creada previamente con la funcion windown. Cuando trabajamos con funciones agregadas dentro de una ventaan, no necesitamos la clausula orderby para ordenar la ventana.

Destacamos del ejemplo que utilizamos la funcion de ventana row_number para obtener el numero de registro en la ventana 
".withColumn("row",row_number().over(windowSpec)" y añadimos un where para que no solo muestre 1 vez el departamento y no muestre por cada registro ".where(col("row")==1)"

In [40]:
from pyspark.sql.functions import col,avg,sum,min,max,row_number

# Create window partitioned bt department column
windowSpecAgg  = Window.partitionBy("department")

df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|     Sales|3760.0|18800|3000|4600|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+

