In [0]:
%run ../includes/common_functions

In [0]:
%run ../includes/configuration

In [0]:
from pyspark.sql.functions import sum, count, countDistinct, desc, asc, rank, when, col
from pyspark.sql.window import Window

In [0]:
demo_df = spark.read.parquet("/mnt/formuladlgen/presentation/race_results")

In [0]:
grouped_df = demo_df\
        .groupBy("race_year", "driver_name")\
        .agg(sum("points").alias("total_points"), countDistinct("race_name").alias("total_races"))

In [0]:
driverRankSpec = Window.partitionBy("race_year").orderBy(desc("total_points"))
grouped_df.withColumn("rank", rank().over(driverRankSpec)).show(100)

In [0]:
driver_standings_df = demo_df\
.groupBy("race_year", "driver_name", "driver_nationality", "team")\
.agg(sum("points").alias("total_points"), count(when(col("position") == 1, True)).alias("wins"))

In [0]:
display(driver_standings_df)

race_year,driver_name,driver_nationality,team,total_points,wins
2013,Nico Hülkenberg,German,Sauber,51.0,0
2013,Kimi Räikkönen,Finnish,Lotus F1,183.0,1
2005,Alexander Wurz,Austrian,McLaren,6.0,0
2007,Rubens Barrichello,Brazilian,Honda,0.0,0
2007,Sakon Yamamoto,Japanese,Spyker,0.0,0
2004,Michael Schumacher,German,Ferrari,148.0,13
2011,Pastor Maldonado,Venezuelan,Williams,1.0,0
2009,Rubens Barrichello,Brazilian,Brawn,77.0,2
2010,Bruno Senna,Brazilian,HRT,0.0,0
2018,Pierre Gasly,French,Toro Rosso,29.0,0


In [0]:
driver_rank_spec = Window.partitionBy("race_year").orderBy(desc("total_points"), desc("wins")) 
final_driver_rankings_df = driver_standings_df.withColumn("rank", rank().over(driver_rank_spec))

In [0]:
final_driver_rankings_df.write.mode("overwrite").format("parquet").saveAsTable("f1_presentation.driver_rankings")

In [0]:
dbutils.notebook.exit("Success")