In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Window Functions") \
    .config('spark.ui.port', '4050')\
    .getOrCreate()

# Window Functions


As Window Functions do Spark operam em um grupo de linhas (como quadro, partição) e retornam um único valor para cada linha de entrada. O Spark SQL oferece suporte a três tipos de funções de janela:

1. ranking functions
2. analytic functions
3. aggregate functions

| USO E SINTAXE - WINDOW FUNCTION | WINDOW FUNCTION DESCRIÇÃO| 
|---|----| 
| row_number(): Column | Retorna um número sequencial começando em 1 dentro de uma janela de partição.
| rank(): Column | Retorna a classificação de linhas em uma partição de janela, com lacunas.
| percent_rank(): Column | Retorna a classificação de percentil de linhas em uma partição de janela.
| dense_rank(): Column | Retorna a classificação de linhas dentro de uma partição de janela sem quaisquer lacunas. Onde, como Rank () retorna a classificação com lacunas.
| ntile(n: Int): Column | Retorna o id do ntile em uma partição de janela
| cume_dist(): Column | Retorna a distribuição cumulativa de valores em uma partição de janela
| lag(e: Column, offset: Int): Columnlag(columnName: String, offset: Int): Columnlag(columnName: String, offset: Int, defaultValue: Any): Column | Retorna o valor que é `offset` linhas antes da linha atual, e` null` se houver menos que `offset` linhas antes da linha atual.
| lead(columnName: String, offset: Int): Columnlead(columnName: String, offset: Int): Columnlead(columnName: String, offset: Int, defaultValue: Any): Column | Retorna o valor que é `offset` linhas após a linha atual, e` null` se houver menos de `offset` linhas após a linha atual.

## Criação do Dataframe

In [10]:
dados = [["James", "Sales", 3000],
    ["Michael", "Sales", 4600],
    ["Robert", "Sales", 4100],
    ["Maria", "Finance", 3000],
    ["James", "Sales", 3000],
    ["Scott", "Finance", 3300],
    ["Jen", "Finance", 3900],
    ["Jeff", "Marketing", 3000],
    ["Kumar", "Marketing", 2000],
    ["Saif", "Sales", 4100]
         ]
colunas = ["employee_name", "department", "salary"]
window_dataframe = spark.createDataFrame(dados, colunas)
window_dataframe.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



## Spark Window Ranking functions

### row_number Window Function
row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.

In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

window_dataframe.withColumn("row_number",row_number().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



### rank Window Function
rank() window function is used to provide a rank to the result within a window partition. 
This function leaves gaps in rank when there are ties.

In [12]:
from pyspark.sql.functions import rank
window_dataframe.withColumn("rank",rank().over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+----------+------+----+



### dense_rank Window Function
A função de janela dense_rank () é usada para obter o resultado com classificação de linhas dentro de uma partição de janela sem quaisquer lacunas. 
Isso é semelhante à diferença da função rank (), sendo que a função de classificação deixa lacunas na classificação quando há empates.

In [13]:
from pyspark.sql.functions import dense_rank
window_dataframe.withColumn("dense_rank",dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+



### percent_rank Window Function


In [14]:
from pyspark.sql.functions import percent_rank
window_dataframe.withColumn("percent_rank",percent_rank().over(windowSpec)).show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
+-------------+----------+------+------------+



### ntile Window Function
A função de janela ntile () retorna a classificação relativa das linhas de resultados dentro de uma partição de janela. No exemplo abaixo, usamos 2 como um argumento para ntile, portanto, ele retorna a classificação entre 2 valores (1 e 2)

In [15]:
from pyspark.sql.functions import ntile
window_dataframe.withColumn("ntile",ntile(2).over(windowSpec)).show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
+-------------+----------+------+-----+



## PySpark Window Analytic functions

### cume_dist Window Function
A função de janela cume_dist () é usada para obter a distribuição cumulativa de valores dentro de uma partição de janela.

In [16]:
from pyspark.sql.functions import cume_dist    
window_dataframe.withColumn("cume_dist",cume_dist().over(windowSpec)).show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
+-------------+----------+------+------------------+



### lag Window Function
É o mesmo que a função LAG em SQL.

In [17]:
from pyspark.sql.functions import lag    
window_dataframe.withColumn("lag",lag("salary",2).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+



### lead Window Function
É o mesmo que a função LEAD em SQL.

In [19]:
from pyspark.sql.functions import lead    
window_dataframe.withColumn("lead",lead("salary",2).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+



## PySpark Window Aggregate Functions
Nesta seção, explicarei como calcular sum, min, max para cada departamento usando as funções de janela PySpark SQL Aggregate e WindowSpec. 
Ao trabalhar com funções agregadas, não precisamos usar ordem por cláusula.  

In [20]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
window_dataframe.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|     Sales|3760.0|18800|3000|4600|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+



# Referências

1. [Scala Spark Window Functions Examples](https://sparkbyexamples.com/spark/spark-sql-window-functions/)
2. [PySpark Window Functions Examples](https://sparkbyexamples.com/pyspark/pyspark-window-functions/)
3. [SQL Window Functions - SQLite example](https://sqlite.org/windowfunctions.html)