In [0]:
# import Libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# the S3 path
s3_path = "s3a://columbia-gr5069-main/raw/"

# read the CSV files into spark dataFrames

df_races = (spark.read.csv(f"{s3_path}/races.csv", header = True))

df_drivers = (spark.read.csv(f"{s3_path}/drivers.csv", header = True))

df_pit_stops = (spark.read.csv(f"{s3_path}/pit_stops.csv", header = True))

df_results = (spark.read.csv(f"{s3_path}/results.csv", header = True))

df_lap_times = (spark.read.csv(f"{s3_path}/lap_times.csv", header = True))

# 1. [10 pts] What was the average time each driver spent at the pit stop for each race?

In [0]:
# average pit stop time per driver, per race
df_avg_pitstop = (
    df_pit_stops
    .groupBy("raceId", "driverId")
    .agg(F.mean("milliseconds").alias("avg_pitstop_time_ms"))
)
# join with race names and driver names
races_sub = df_races.select("raceId", "name")
drivers_sub = df_drivers.select(
    "driverId",
    F.concat_ws(" ", "forename", "surname").alias("driver_name")
)

# create a new DataFrame with race name and driver name
df_avg_pitstop_named = (
    df_avg_pitstop
    .join(races_sub, "raceId", "left")
    .join(drivers_sub, "driverId", "left")
    .select("raceId", "name", "driverId", "driver_name", "avg_pitstop_time_ms")
)

display(df_avg_pitstop_named.orderBy("raceId", "driverId"))

raceId,name,driverId,driver_name,avg_pitstop_time_ms
1000,Hungarian Grand Prix,1,Lewis Hamilton,21480.0
1000,Hungarian Grand Prix,154,Romain Grosjean,21733.0
1000,Hungarian Grand Prix,20,Sebastian Vettel,23111.0
1000,Hungarian Grand Prix,4,Fernando Alonso,21795.0
1000,Hungarian Grand Prix,8,Kimi Räikkönen,23150.0
1000,Hungarian Grand Prix,807,Nico Hülkenberg,22308.0
1000,Hungarian Grand Prix,815,Sergio Pérez,22561.0
1000,Hungarian Grand Prix,817,Daniel Ricciardo,21364.0
1000,Hungarian Grand Prix,822,Valtteri Bottas,21337.0
1000,Hungarian Grand Prix,825,Kevin Magnussen,25126.0


# 2. [20 pts] Rank the average time spent at the pit stop in order of who won each race

In [0]:
results_sub = df_results.select("raceId", "driverId", "position")

# join average pit stops with finishing position
df_joined = (
    df_avg_pitstop_named.alias("pit")
    .join(results_sub.alias("res"), 
          (F.col("pit.raceId") == F.col("res.raceId")) & 
          (F.col("pit.driverId") == F.col("res.driverId")),
          "inner")
    .select(
        F.col("pit.raceId"), 
        F.col("pit.name").alias("race_name"),
        "res.driverId",
        "pit.driver_name",
        "res.position",
        "pit.avg_pitstop_time_ms"
    )
)

# order by race and finishing position
rank_window = Window.partitionBy("raceId").orderBy("pit.avg_pitstop_time_ms")

ranked_df = df_joined.withColumn(
    "finishing_rank_in_race",
    F.rank().over(rank_window)
)

display(ranked_df.orderBy("raceId", "position"))

raceId,race_name,driverId,driver_name,position,avg_pitstop_time_ms,finishing_rank_in_race
1000,Hungarian Grand Prix,1,Lewis Hamilton,1,21480.0,4
1000,Hungarian Grand Prix,154,Romain Grosjean,10,21733.0,8
1000,Hungarian Grand Prix,843,Brendon Hartley,11,21831.0,10
1000,Hungarian Grand Prix,807,Nico Hülkenberg,12,22308.0,13
1000,Hungarian Grand Prix,839,Esteban Ocon,13,22258.0,12
1000,Hungarian Grand Prix,815,Sergio Pérez,14,22561.0,14
1000,Hungarian Grand Prix,828,Marcus Ericsson,15,22640.0,15
1000,Hungarian Grand Prix,845,Sergey Sirotkin,16,21509.0,5
1000,Hungarian Grand Prix,840,Lance Stroll,17,21291.0,1
1000,Hungarian Grand Prix,20,Sebastian Vettel,2,23111.0,16


# 3. [20 pts] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
# fill code if it's NULL or empty with uppercase of first 3 letters of surname
df_drivers_updated = df_drivers.withColumn(
    "code",
    F.when(
        F.col("code").isNull() | (F.col("code") == " "),
        F.upper(F.substring("surname", 1, 3)) 
    ).otherwise(F.col("code"))
)

display(df_drivers_updated.select("driverId", "forename", "surname", "code").orderBy("driverId"))

driverId,forename,surname,code
1,Lewis,Hamilton,HAM
10,Timo,Glock,GLO
100,Érik,Comas,\N
101,David,Brabham,\N
102,Ayrton,Senna,\N
103,Éric,Bernard,\N
104,Christian,Fittipaldi,\N
105,Michele,Alboreto,\N
106,Olivier,Beretta,\N
107,Roland,Ratzenberger,\N


# 4. [20 pts] Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
# join drivers to races via results to know which driver was in which race
df_driver_race_join = (
    df_results.select("raceId", "driverId")
    .join(df_races.select("raceId", "date"), "raceId")
    .join(df_drivers.select("driverId", "dob", 
                            F.concat_ws(" ", "forename", "surname").alias("driver_name")),
          "driverId")
)

# try to calculate the age in years
df_driver_race_with_age = (
    df_driver_race_join
    .withColumn(
        "age",
        F.round(F.datediff(F.col("date"), F.col("dob")) / 365, 2)
    )
)

display(df_driver_race_with_age.orderBy("raceId", "driverId"))

# the min/max ages per race
df_min_max_age = (
    df_driver_race_with_age
    .groupBy("raceId")
    .agg(
        F.min("age").alias("youngest_age"),
        F.max("age").alias("oldest_age")
    )
)

# join back to get the actual driver with that youngest or oldest age
df_youngest_drivers = (
    df_driver_race_with_age.alias("all")
    .join(df_min_max_age.alias("mm"), 
          (F.col("all.raceId") == F.col("mm.raceId")) & 
          (F.col("all.age") == F.col("mm.youngest_age")), 
          "inner")
    .select(
        F.col("all.raceId"),
        F.col("all.driver_name"),
        F.col("all.age").alias("youngest_age")
    )
    .distinct()
)

df_oldest_drivers = (
    df_driver_race_with_age.alias("all")
    .join(df_min_max_age.alias("mm"), 
          (F.col("all.raceId") == F.col("mm.raceId")) & 
          (F.col("all.age") == F.col("mm.oldest_age")), 
          "inner")
    .select(
        F.col("all.raceId"),
        F.col("all.driver_name"),
        F.col("all.age").alias("oldest_age")
    )
    .distinct()
)

print("Youngest drivers per race:")
display(df_youngest_drivers)

print("Oldest drivers per race:")
display(df_oldest_drivers)


driverId,raceId,date,dob,driver_name,age
1,1,2009-03-29,1985-01-07,Lewis Hamilton,24.24
10,1,2009-03-29,1982-03-18,Timo Glock,27.05
12,1,2009-03-29,1985-07-25,Nelson Piquet Jr.,23.69
13,1,2009-03-29,1981-04-25,Felipe Massa,27.95
15,1,2009-03-29,1974-07-13,Jarno Trulli,34.73
16,1,2009-03-29,1983-01-11,Adrian Sutil,26.23
17,1,2009-03-29,1976-08-27,Mark Webber,32.61
18,1,2009-03-29,1980-01-19,Jenson Button,29.21
2,1,2009-03-29,1977-05-10,Nick Heidfeld,31.91
20,1,2009-03-29,1987-07-03,Sebastian Vettel,21.75


Youngest drivers per race:


raceId,driver_name,youngest_age
175,Ricardo Zonta,22.97
191,Esteban Tuero,19.89
589,Tony Brise,23.54
794,Jimmy Reece,25.55
150,Fernando Alonso,19.94
402,Alex Caffi,23.64
479,Andrea de Cesaris,23.22
578,Jody Scheckter,25.1
774,Masten Gregory,26.54
885,Esteban Gutiérrez,21.82


Oldest drivers per race:


raceId,driver_name,oldest_age
25,David Coulthard,37.27
366,René Arnoux,41.18
325,Nelson Piquet,37.84
1033,Kimi Räikkönen,40.78
672,Jack Brabham,42.29
865,Michael Schumacher,43.42
640,Graham Hill,42.58
169,Jean Alesi,36.2
129,Eddie Irvine,36.53
400,René Arnoux,39.26


# 5. [20 pts] For a given race, which driver has the most wins and losses?

In [0]:
# find each race’s last position
df_race_pos = (
    df_results
    .groupBy("raceId")
    .agg(F.max("position").alias("last_position"))
)

# obtain the race name from df_races and position from results
df_results_race_join= (
    df_results.alias("res")
    .join(df_races.alias("rac"), "raceId")
    .select(
        "res.raceId",
        "res.driverId",
        "res.position",
        "rac.name" 
    )
)

# count total wins per race
df_wins = (
    df_results_race_join
    .filter(F.col("position") == 1)
    .groupBy("name", "driverId")
    .agg(F.count("*").alias("win_count"))
)

# find the driver with the maximum wins for each race
win_window = Window.partitionBy("name").orderBy(F.desc("win_count"))
df_wins_ranked = df_wins.withColumn("win_rank", F.rank().over(win_window))

df_most_wins = (
    df_wins_ranked
    .filter(F.col("win_rank") == 1)
    .join(
        df_drivers.select("driverId", 
                          F.concat_ws(" ", "forename", "surname").alias("driver_name")),
        "driverId", "left"
    )
    .select("name", "driver_name", "win_count")
    .orderBy("name")
)

print("Drivers with the MOST WINS for each race:")
display(df_most_wins)

# count total losses

# get who is last in each race
df_last = (
    df_results_race_join.alias("rr")
    .join(df_race_pos.alias("rp"), "raceId")
    .filter(F.col("rr.position") == F.col("rp.last_position"))  # rr is the last place finisher for each race
    
)

df_losses = (
    df_last
    .groupBy("name", "driverId")
    .agg(F.count("*").alias("loss_count"))
)

loss_window = Window.partitionBy("name").orderBy(F.desc("loss_count"))
df_losses_ranked = df_losses.withColumn("loss_rank", F.rank().over(loss_window))

df_most_losses = (
    df_losses_ranked
    .filter(F.col("loss_rank") == 1)
    .join(
        df_drivers.select("driverId", 
                          F.concat_ws(" ", "forename", "surname").alias("driver_name")),
        "driverId", "left"
    )
    .select("name", "driver_name", "loss_count")
    .orderBy("name")
)

print("Drivers with the MOST LOSSES for each race:")
display(df_most_losses)

Drivers with the MOST WINS for each race:


name,driver_name,win_count
70th Anniversary Grand Prix,Max Verstappen,1
Abu Dhabi Grand Prix,Lewis Hamilton,5
Argentine Grand Prix,Juan Fangio,4
Australian Grand Prix,Michael Schumacher,4
Austrian Grand Prix,Max Verstappen,4
Azerbaijan Grand Prix,Sergio Pérez,2
Bahrain Grand Prix,Lewis Hamilton,5
Belgian Grand Prix,Michael Schumacher,6
Brazilian Grand Prix,Alain Prost,6
British Grand Prix,Lewis Hamilton,8


Drivers with the MOST LOSSES for each race:


name,driver_name,loss_count
70th Anniversary Grand Prix,Kevin Magnussen,1
Abu Dhabi Grand Prix,Kimi Räikkönen,3
Argentine Grand Prix,John Watson,5
Argentine Grand Prix,Jean-Pierre Jarier,5
Argentine Grand Prix,Carlos Menditeguy,5
Australian Grand Prix,Michael Schumacher,10
Austrian Grand Prix,Riccardo Patrese,8
Azerbaijan Grand Prix,Max Verstappen,3
Bahrain Grand Prix,Jenson Button,5
Belgian Grand Prix,Riccardo Patrese,12


# 6. (self question) For each race, which driver had the best (lowest) average lap time across the entire race?

In [0]:
# calculate average lap time for each driver in each race
df_avg_lap_time = (
    df_lap_times
    .groupBy("raceId", "driverId")
    .agg(F.mean("milliseconds").alias("avg_lap_time_ms"))
)

# window partitioned by raceId, ordering by avg_lap_time_ms ascending
race_window = Window.partitionBy("raceId").orderBy("avg_lap_time_ms")

df_ranked_lap_time = (
    df_avg_lap_time
    .withColumn("lap_time_rank", F.rank().over(race_window))
)


# join the ranking DataFrame
df_best_avg_lap = (
    df_ranked_lap_time.alias("rl")
    .join(drivers_sub.alias("d"), "driverId", "left")
    .join(races_sub.alias("r"), "raceId", "left")
    .filter(F.col("lap_time_rank") == 1)  
    .select(
        "rl.raceId",
        "r.name",
        "rl.driverId",
        "d.driver_name",
        "rl.avg_lap_time_ms"
    )
    .orderBy("rl.raceId")
)

display(df_best_avg_lap)

raceId,name,driverId,driver_name,avg_lap_time_ms
1,Australian Grand Prix,6,Kazuki Nakajima,91822.23529411764
10,Hungarian Grand Prix,1,Lewis Hamilton,84341.08571428571
100,British Grand Prix,15,Jarno Trulli,82924.94871794872
1000,Hungarian Grand Prix,1,Lewis Hamilton,83377.52857142857
1001,Belgian Grand Prix,20,Sebastian Vettel,113965.36363636365
1002,Italian Grand Prix,1,Lewis Hamilton,87065.7358490566
1003,Singapore Grand Prix,1,Lewis Hamilton,109370.67213114754
1004,Russian Grand Prix,1,Lewis Hamilton,98965.679245283
1005,Japanese Grand Prix,1,Lewis Hamilton,98812.49056603774
1006,United States Grand Prix,8,Kimi Räikkönen,101047.19642857143
