As Windows Functions retornam um único valor para cada grupo de linhas.

O Pyspark oferece suporte a 3 Windows Functions:

1. Ranking Functions
2. Analytic Functions
3. Agreggate Functions

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

dados = [
         ("Anderson", "Vendas", "SP", 1500.00, 34,1000.00),
         ("Kennedy", "Vendas", "CE", 1200.00, 56, 2000.00),
         ("Bruno", "Vendas", "SP", 1100.00, 30, 2300.00),
         ("Maria", "Finanças", "CE", 3600.00, 24, 2300.00),
         ("Eduardo", "Finanças", "CE", 4500.00, 40, 2400.00),
         ("Mendes", "Finanças", "RS", 8000.00, 36, 1900.00),
         ("Kethlyn", "Finanças", "RS", 1200.00, 53, 1500.00),
         ("Thiago", "Marketing", "GO", 1100.00, 25, 1800.00),
         ('Carla', 'Marketing', 'GO', 2600.00, 50, 2100.00)
]

schema = ['nome','departamento','estado','salario','idade','bonus']

df = spark.createDataFrame(data=dados,schema=schema)

df.display()

nome,departamento,estado,salario,idade,bonus
Anderson,Vendas,SP,1500.0,34,1000.0
Kennedy,Vendas,CE,1200.0,56,2000.0
Bruno,Vendas,SP,1100.0,30,2300.0
Maria,Finanças,CE,3600.0,24,2300.0
Eduardo,Finanças,CE,4500.0,40,2400.0
Mendes,Finanças,RS,8000.0,36,1900.0
Kethlyn,Finanças,RS,1200.0,53,1500.0
Thiago,Marketing,GO,1100.0,25,1800.0
Carla,Marketing,GO,2600.0,50,2100.0


Retorna o numero da linah de acordo coma  coluna que foi particionada

1. Particionar o DataFrame: partitionBy
2. Escolher a coluna que será particionada e por qual coluna será ordenada 
3. Precisa particionar para a função row_number rodar em cima dessa partição, o row_number numera as linhas de cada partição feita

In [0]:
w0 = Window.partitionBy(F.col('departamento')).orderBy('salario')
df.withColumn('row_number',F.row_number().over(w0)).display()


nome,departamento,estado,salario,idade,bonus,row_number
Kethlyn,Finanças,RS,1200.0,53,1500.0,1
Maria,Finanças,CE,3600.0,24,2300.0,2
Eduardo,Finanças,CE,4500.0,40,2400.0,3
Mendes,Finanças,RS,8000.0,36,1900.0,4
Thiago,Marketing,GO,1100.0,25,1800.0,1
Carla,Marketing,GO,2600.0,50,2100.0,2
Bruno,Vendas,SP,1100.0,30,2300.0,1
Kennedy,Vendas,CE,1200.0,56,2000.0,2
Anderson,Vendas,SP,1500.0,34,1000.0,3


O rank numera de acordo com algo especifico.

Como um ranking mesmo.

Criar uma nova coluna para armazenar o ranking da partição anterior

Se tiver valores iguais, ele rankeia igualmente, porém cria lacunas.

In [0]:
df.withColumn('rank',F.rank().over(w0)).display()

nome,departamento,estado,salario,idade,bonus,rank
Kethlyn,Finanças,RS,1200.0,53,1500.0,1
Maria,Finanças,CE,3600.0,24,2300.0,2
Eduardo,Finanças,CE,4500.0,40,2400.0,3
Mendes,Finanças,RS,8000.0,36,1900.0,4
Thiago,Marketing,GO,1100.0,25,1800.0,1
Carla,Marketing,GO,2600.0,50,2100.0,2
Bruno,Vendas,SP,1100.0,30,2300.0,1
Kennedy,Vendas,CE,1200.0,56,2000.0,2
Anderson,Vendas,SP,1500.0,34,1000.0,3


Cria um ranking tambem, porém não cria lacunas caso haja empate

Se houver empate, coloca ambos na mesma posição do ranking, e o proximo continua na sequencia

In [0]:
df.withColumn('dense_rank',F.dense_rank().over(w0)).display()

nome,departamento,estado,salario,idade,bonus,dense_rank
Kethlyn,Finanças,RS,1200.0,53,1500.0,1
Maria,Finanças,CE,3600.0,24,2300.0,2
Eduardo,Finanças,CE,4500.0,40,2400.0,3
Mendes,Finanças,RS,8000.0,36,1900.0,4
Thiago,Marketing,GO,1100.0,25,1800.0,1
Carla,Marketing,GO,2600.0,50,2100.0,2
Bruno,Vendas,SP,1100.0,30,2300.0,1
Kennedy,Vendas,CE,1200.0,56,2000.0,2
Anderson,Vendas,SP,1500.0,34,1000.0,3


Também cria o ranking, porém com resultados percentuais

In [0]:
df.withColumn('percent_rank',F.percent_rank().over(w0)).display()

nome,departamento,estado,salario,idade,bonus,percent_rank
Kethlyn,Finanças,RS,1200.0,53,1500.0,0.0
Maria,Finanças,CE,3600.0,24,2300.0,0.3333333333333333
Eduardo,Finanças,CE,4500.0,40,2400.0,0.6666666666666666
Mendes,Finanças,RS,8000.0,36,1900.0,1.0
Thiago,Marketing,GO,1100.0,25,1800.0,0.0
Carla,Marketing,GO,2600.0,50,2100.0,1.0
Bruno,Vendas,SP,1100.0,30,2300.0,0.0
Kennedy,Vendas,CE,1200.0,56,2000.0,0.5
Anderson,Vendas,SP,1500.0,34,1000.0,1.0


Escolhe uma coluna especifica e um numero.

A função olha as linhas em sequencia da coluna,e verifica se possui linhas anteriores a cada uma.

Se possuir a quantidade de linhas anteriores informadas no numero, coloca qual é o valor da linha anterior na coluna de lag, se não tiver, coloca null

In [0]:
df.withColumn('lag',F.lag('salario',1).over(w0)).display()

nome,departamento,estado,salario,idade,bonus,lag
Kethlyn,Finanças,RS,1200.0,53,1500.0,
Maria,Finanças,CE,3600.0,24,2300.0,1200.0
Eduardo,Finanças,CE,4500.0,40,2400.0,3600.0
Mendes,Finanças,RS,8000.0,36,1900.0,4500.0
Thiago,Marketing,GO,1100.0,25,1800.0,
Carla,Marketing,GO,2600.0,50,2100.0,1100.0
Bruno,Vendas,SP,1100.0,30,2300.0,
Kennedy,Vendas,CE,1200.0,56,2000.0,1100.0
Anderson,Vendas,SP,1500.0,34,1000.0,1200.0


Faz o contrario do lag

A função olha as linhas em sequencia da coluna,e verifica se possui linhas a frente de cada uma.

Se possuir a quantidade de linhas a frente informadas no numero, coloca qual é o valor da linha a frente na coluna de lag, se não tiver, coloca null

In [0]:
df.withColumn('lead',F.lead('salario',2).over(w0)).display()

nome,departamento,estado,salario,idade,bonus,lead
Kethlyn,Finanças,RS,1200.0,53,1500.0,4500.0
Maria,Finanças,CE,3600.0,24,2300.0,8000.0
Eduardo,Finanças,CE,4500.0,40,2400.0,
Mendes,Finanças,RS,8000.0,36,1900.0,
Thiago,Marketing,GO,1100.0,25,1800.0,
Carla,Marketing,GO,2600.0,50,2100.0,
Bruno,Vendas,SP,1100.0,30,2300.0,1500.0
Kennedy,Vendas,CE,1200.0,56,2000.0,
Anderson,Vendas,SP,1500.0,34,1000.0,


In [0]:
(df.withColumn("row", F.row_number().over(w0))
  .withColumn("avg", F.avg(F.col("salario")).over(w0))
 .withColumn("sum", F.sum(F.col("salario")).over(w0))
 .withColumn("min", F.min(F.col("salario")).over(w0))
 .withColumn("max", F.max(F.col("salario")).over(w0))
 .select("row","departamento", "avg", "sum", "min", "max").display()
)

row,departamento,avg,sum,min,max
1,Finanças,1200.0,1200.0,1200.0,1200.0
2,Finanças,2400.0,4800.0,1200.0,3600.0
3,Finanças,3100.0,9300.0,1200.0,4500.0
4,Finanças,4325.0,17300.0,1200.0,8000.0
1,Marketing,1100.0,1100.0,1100.0,1100.0
2,Marketing,1850.0,3700.0,1100.0,2600.0
1,Vendas,1100.0,1100.0,1100.0,1100.0
2,Vendas,1150.0,2300.0,1100.0,1200.0
3,Vendas,1266.6666666666667,3800.0,1100.0,1500.0
