###Window Function
#### window function works on group of rows and returns a single value for each row



#####WINDOW FUNCTIONS USAGE & SYNTAX	PYSPARK WINDOW FUNCTIONS DESCRIPTION
######row_number()                                     : Column	Returns a sequential number starting from 1 within a window partition
######rank()                                           : Column	Returns the rank of rows within a window partition, with gaps.
######percent_rank()                                   : Column	Returns the percentile rank of rows within a window partition.
######dense_rank()                                     : Column	Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.
######ntile(n: Int)                                    : Column	Returns the ntile id in a window partition
######cume_dist()                                      : Column	Returns the cumulative distribution of values within a window partition
######lag(e: Column, offset: Int)                      : Column
######lag(columnName: String, offset: Int)             : Column
######lag(columnName: String, offset: Int, defaultValue: Any): Column	returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.
######lead(columnName: String, offset: Int): Column
######lead(columnName: String, offset: Int): Column
######lead(columnName: String, offset: Int, defaultValue: Any): Column	returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank, dense_rank,col, aggregate, countDistinct
from pyspark.sql.functions import sum as _sum

In [0]:
%run "../includes/configuration"

In [0]:

df_demo = spark.read.option("header",True).parquet(f"{presentation_folderpath}/race_results")
display(df_demo.limit(10))

race_year,race_name,race_date,circuit_location,driver_name,driver_number,driver_nationality,team,grid,fastest_lap,race_time,points,position,created_date
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Sergey Sirotkin,35,Russian,Williams,19,3,\N,0.0,,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Marcus Ericsson,9,Swedish,Sauber,17,4,\N,0.0,,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Pierre Gasly,10,French,Toro Rosso,20,13,\N,0.0,,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Kevin Magnussen,20,Danish,Haas F1 Team,5,21,\N,0.0,,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Romain Grosjean,8,French,Haas F1 Team,6,23,\N,0.0,,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Brendon Hartley,28,New Zealander,Toro Rosso,16,57,\N,0.0,15.0,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Lance Stroll,18,Canadian,Williams,13,55,+1:18.288,0.0,14.0,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Charles Leclerc,16,Monegasque,Sauber,18,56,+1:15.759,0.0,13.0,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Esteban Ocon,31,French,Force India,14,57,+1:00.278,0.0,12.0,2021-12-23T06:13:06.418+0000
2018,Australian Grand Prix,2018-03-25T05:10:00.000+0000,Melbourne,Sergio Pérez,11,Mexican,Force India,12,51,+46.817,0.0,11.0,2021-12-23T06:13:06.418+0000


In [0]:
df_demo_grouped = df_demo\
.groupBy("race_year","driver_name")\
.agg(_sum("points").alias("total_points"), countDistinct("race_name").alias("num_races"))

In [0]:
driverRankSpec = Window.partitionBy("race_year").orderBy(desc("total_points"))
df_dem_ranked= df_demo_grouped.withColumn("rank",rank().over(driverRankSpec))

In [0]:
display(df_dem_ranked)

race_year,driver_name,total_points,num_races,rank
1950,Nino Farina,30.0,6,1
1950,Luigi Fagioli,28.0,6,2
1950,Juan Fangio,27.0,6,3
1950,Louis Rosier,13.0,6,4
1950,Alberto Ascari,11.0,4,5
1950,Johnnie Parsons,9.0,1,6
1950,Bill Holland,6.0,1,7
1950,Prince Bira,5.0,4,8
1950,Peter Whitehead,4.0,3,9
1950,Louis Chiron,4.0,5,9


In [0]:
driverDenseRankSpec = Window.partitionBy("race_year").orderBy(desc("total_points"))
df_demo_dense_ranked = df_demo_grouped.withColumn("dense_rank",dense_rank().over(driverDenseRankSpec))
display(df_demo_dense_ranked)

race_year,driver_name,total_points,num_races,dense_rank
1950,Nino Farina,30.0,6,1
1950,Luigi Fagioli,28.0,6,2
1950,Juan Fangio,27.0,6,3
1950,Louis Rosier,13.0,6,4
1950,Alberto Ascari,11.0,4,5
1950,Johnnie Parsons,9.0,1,6
1950,Bill Holland,6.0,1,7
1950,Prince Bira,5.0,4,8
1950,Peter Whitehead,4.0,3,9
1950,Louis Chiron,4.0,5,9


In [0]:
df_del = df_demo_dense_ranked.select("*").where(col("race_year")>2000).orderBy(desc(col("total_points")),col("race_year"))
display(df_del)

race_year,driver_name,total_points,num_races,dense_rank
2019,Lewis Hamilton,413.0,21,1
2018,Lewis Hamilton,408.0,21,1
2013,Sebastian Vettel,397.0,19,1
2011,Sebastian Vettel,392.0,19,1
2016,Nico Rosberg,385.0,21,1
2014,Lewis Hamilton,384.0,19,1
2015,Lewis Hamilton,381.0,19,1
2016,Lewis Hamilton,380.0,21,2
2017,Lewis Hamilton,363.0,20,1
2020,Lewis Hamilton,347.0,16,1


In [0]:
df_rank_exmpl = df_demo_grouped.withColumn("dense_rank",dense_rank().over("total_points"))