In [0]:
from pyspark.sql.functions import col, rank, dense_rank, cume_dist, row_number
from pyspark.sql import functions as F
from pyspark.sql.window import Window

The Window functions are another way to aggregate data in PySpark.

It comes from the SQL function WINDOW, so it works similarly to it. These functions can be used to calculate ranks, cumulative distributions or even act as a rolling window to bring an aggregated result.
Sometimes it can be a great option to replace For Loops that, as already mentioned, don't do well at all in Spark.

In [0]:
# Point file path
path = '/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv'

# Load Data
df = spark.read.csv(path, header=True)

# Update columns data types
df = df.select( col('carat').cast('float'),
                'cut', 'color', 'clarity',
                col('depth').cast('float'),
                col('table').cast('float'),
                col('price').cast('float'),
                col('x').cast('float'),
                col('y').cast('float'),
                col('z').cast('float')    )

In [0]:
# Creating a window spec
winspec = Window.partitionBy("cut").orderBy("price")

# Row Number
"Row Number brings the row number ordered by the window specification: by cut, ordered by price"
display(df #dataset
 .withColumn("rowNumber",row_number().over(winspec)) # New col with Window
 .select('cut', 'price', 'rowNumber')
)

cut,price,rowNumber
Fair,337.0,1
Fair,361.0,2
Fair,369.0,3
Fair,371.0,4
Fair,416.0,5
Fair,496.0,6
Fair,497.0,7
Fair,527.0,8
Fair,536.0,9
Fair,563.0,10


In [0]:
"Rank has gaps when there are ties. Example 1..1..3"
# We can use the same window spec
display(
    df
    .withColumn('rank', rank().over(winspec))
    .select('cut', 'price', 'rank')
)

cut,price,rank
Fair,337.0,1
Fair,361.0,2
Fair,369.0,3
Fair,371.0,4
Fair,416.0,5
Fair,496.0,6
Fair,497.0,7
Fair,527.0,8
Fair,536.0,9
Fair,563.0,10


In [0]:
"Dense rank is different than rank because it does not leaves gaps."
# We can use the same window spec
display(
    df
    .withColumn('denseRank', dense_rank().over(winspec))
    .select('cut', 'price', 'denseRank')
)

cut,price,denseRank
Fair,337.0,1
Fair,361.0,2
Fair,369.0,3
Fair,371.0,4
Fair,416.0,5
Fair,496.0,6
Fair,497.0,7
Fair,527.0,8
Fair,536.0,9
Fair,563.0,10


In [0]:
# Window spec
winspec = Window.partitionBy("cut").orderBy("avg_price")

# Cumulative distribution of an aggregated data
display(
    df # dataset
    .groupBy('cut', 'color') #columns to group data
    .agg(F.mean('price').alias('avg_price')) # avg price by group
    .withColumn('cumeDist', F.round(cume_dist().over(winspec),2)) # window to calculate cume_dist
)

cut,color,avg_price,cumeDist
Fair,E,3682.3125,0.14
Fair,F,3827.003205128205,0.29
Fair,G,4239.254777070064,0.43
Fair,D,4291.061349693252,0.57
Fair,I,4685.4457142857145,0.71
Fair,J,4975.655462184874,0.86
Fair,H,5135.683168316832,1.0
Good,D,3405.3821752265862,0.14
Good,E,3423.6441586280816,0.29
Good,F,3495.7502750275025,0.43


In [0]:
# Window Spec
ws = Window.partitionBy('color').orderBy('carat').rowsBetween(-1, Window.currentRow )

display(df
        .groupBy('color', 'carat')
        .agg(F.mean('price').alias('avg_price'))
        .withColumn('range3', F.sum('avg_price').over(ws))
        .select('carat', 'color', 'avg_price', 'range3')
        )

carat,color,avg_price,range3
0.2,D,367.0,367.0
0.21,D,386.0,753.0
0.22,D,404.0,790.0
0.23,D,493.3555555555556,897.3555555555556
0.24,D,509.2142857142857,1002.5698412698412
0.25,D,496.57142857142856,1005.7857142857142
0.26,D,560.8260869565217,1057.3975155279504
0.27,D,626.6333333333333,1187.4594202898552
0.28,D,597.25,1223.8833333333332
0.29,D,629.8181818181819,1227.068181818182


In [0]:
display(
  df #dataset
  .groupBy('cut', 'color') #grouping columns
  .agg(F.count('cut').alias('obs_ct')) # aggregation
  .withColumn('pct', col('obs_ct') / F.sum('obs_ct').over(Window.partitionBy('cut') ) ) # Pct group using window
  .sort('cut',col('pct').desc()) # sort
)


cut,color,obs_ct,pct
Fair,G,314,0.1950310559006211
Fair,F,312,0.1937888198757764
Fair,H,303,0.1881987577639751
Fair,E,224,0.1391304347826087
Fair,I,175,0.108695652173913
Fair,D,163,0.1012422360248447
Fair,J,119,0.0739130434782608
Good,E,933,0.1901752955564614
Good,F,909,0.1852833265389319
Good,G,871,0.1775377089278434
