In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.sql.functions import when, col, upper, substring, datediff, to_date, max, min, row_number, count, sum

pit_stops_path = "s3://columbia-gr5069-main/raw/pit_stops.csv"
drivers_path = "s3://columbia-gr5069-main/raw/drivers.csv"
results_path = "s3://columbia-gr5069-main/raw/results.csv"
races_path = "s3://columbia-gr5069-main/raw/races.csv"
status_path = "s3://columbia-gr5069-main/raw/status.csv"
constructors_path = "s3://columbia-gr5069-main/raw/constructors.csv"

pit_stops_df = spark.read.csv(pit_stops_path, header=True, inferSchema=True)
drivers_df = spark.read.csv(drivers_path, header=True, inferSchema=True)
results_df = spark.read.csv(results_path, header=True, inferSchema=True)
races_df = spark.read.csv(races_path, header=True, inferSchema=True)
status_df = spark.read.csv(status_path, header=True, inferSchema=True)
constructors_df = spark.read.csv(constructors_path, header=True, inferSchema=True)

In [0]:
display(pit_stops_df.limit(10))
display(drivers_df.limit(10))
display(results_df.limit(10))
display(races_df.limit(10))
display(status_df.limit(10))
display(constructors_df.limit(10))

raceId,driverId,stop,lap,time,duration,milliseconds
841,153,1,1,2025-03-23T17:05:23Z,26.898,26898
841,30,1,1,2025-03-23T17:05:52Z,25.021,25021
841,17,1,11,2025-03-23T17:20:48Z,23.426,23426
841,4,1,12,2025-03-23T17:22:34Z,23.251,23251
841,13,1,13,2025-03-23T17:24:10Z,23.842,23842
841,22,1,13,2025-03-23T17:24:29Z,23.643,23643
841,20,1,14,2025-03-23T17:25:17Z,22.603,22603
841,814,1,14,2025-03-23T17:26:03Z,24.863,24863
841,816,1,14,2025-03-23T17:26:50Z,25.259,25259
841,67,1,15,2025-03-23T17:27:34Z,25.342,25342


driverId,driverRef,number,code,forename,surname,dob,nationality,url
1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,\N,NAK,Kazuki,Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,\N,BOU,Sébastien,Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7,RAI,Kimi,Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88,KUB,Robert,Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,\N,GLO,Timo,Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
1,18,1,1,22,1,1,1,1,10.0,58,1:34:50.616,5690616,39,2,1:27.452,218.3,1
2,18,2,2,3,5,2,2,2,8.0,58,+5.478,5696094,41,3,1:27.739,217.586,1
3,18,3,3,7,7,3,3,3,6.0,58,+8.163,5698779,41,5,1:28.090,216.719,1
4,18,4,4,5,11,4,4,4,5.0,58,+17.181,5707797,58,7,1:28.603,215.464,1
5,18,5,1,23,3,5,5,5,4.0,58,+18.014,5708630,43,1,1:27.418,218.385,1
6,18,6,3,8,13,6,6,6,3.0,57,\N,\N,50,14,1:29.639,212.974,11
7,18,7,5,14,17,7,7,7,2.0,55,\N,\N,22,12,1:29.534,213.224,5
8,18,8,6,1,15,8,8,8,1.0,53,\N,\N,20,4,1:27.903,217.18,5
9,18,9,2,4,2,\N,R,9,0.0,47,\N,\N,15,9,1:28.753,215.1,4
10,18,10,7,12,18,\N,R,10,0.0,43,\N,\N,23,13,1:29.558,213.166,3


raceId,year,round,circuitId,name,date,time,url,fp1_date,fp1_time,fp2_date,fp2_time,fp3_date,fp3_time,quali_date,quali_time,sprint_date,sprint_time
1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00,http://en.wikipedia.org/wiki/2009_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00,http://en.wikipedia.org/wiki/2009_Malaysian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00,http://en.wikipedia.org/wiki/2009_Chinese_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
4,2009,4,3,Bahrain Grand Prix,2009-04-26,12:00:00,http://en.wikipedia.org/wiki/2009_Bahrain_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
5,2009,5,4,Spanish Grand Prix,2009-05-10,12:00:00,http://en.wikipedia.org/wiki/2009_Spanish_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
6,2009,6,6,Monaco Grand Prix,2009-05-24,12:00:00,http://en.wikipedia.org/wiki/2009_Monaco_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
7,2009,7,5,Turkish Grand Prix,2009-06-07,12:00:00,http://en.wikipedia.org/wiki/2009_Turkish_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
8,2009,8,9,British Grand Prix,2009-06-21,12:00:00,http://en.wikipedia.org/wiki/2009_British_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
9,2009,9,20,German Grand Prix,2009-07-12,12:00:00,http://en.wikipedia.org/wiki/2009_German_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
10,2009,10,11,Hungarian Grand Prix,2009-07-26,12:00:00,http://en.wikipedia.org/wiki/2009_Hungarian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N


statusId,status
1,Finished
2,Disqualified
3,Accident
4,Collision
5,Engine
6,Gearbox
7,Transmission
8,Clutch
9,Hydraulics
10,Electrical


constructorId,constructorRef,name,nationality,url
1,mclaren,McLaren,British,http://en.wikipedia.org/wiki/McLaren
2,bmw_sauber,BMW Sauber,German,http://en.wikipedia.org/wiki/BMW_Sauber
3,williams,Williams,British,http://en.wikipedia.org/wiki/Williams_Grand_Prix_Engineering
4,renault,Renault,French,http://en.wikipedia.org/wiki/Renault_in_Formula_One
5,toro_rosso,Toro Rosso,Italian,http://en.wikipedia.org/wiki/Scuderia_Toro_Rosso
6,ferrari,Ferrari,Italian,http://en.wikipedia.org/wiki/Scuderia_Ferrari
7,toyota,Toyota,Japanese,http://en.wikipedia.org/wiki/Toyota_Racing
8,super_aguri,Super Aguri,Japanese,http://en.wikipedia.org/wiki/Super_Aguri_F1
9,red_bull,Red Bull,Austrian,http://en.wikipedia.org/wiki/Red_Bull_Racing
10,force_india,Force India,Indian,http://en.wikipedia.org/wiki/Racing_Point_Force_India


1. What was the average time each driver spent at the pit stop for each race?

In [0]:
avg_pit_time_df = pit_stops_df.groupBy("raceId", "driverId").agg(F.round(F.avg("milliseconds"), 2).alias("avg_pit_time"))
display(avg_pit_time_df.limit(10))

raceId,driverId,avg_pit_time
858,4,20762.5
880,154,22302.0
890,814,22834.67
904,815,22324.5
952,830,22358.5
964,13,22463.0
977,20,21750.0
980,828,23670.67
1000,4,21795.0
1008,844,23593.0


2. Rank the average time spent at the pit stop in order of who won each race?

- In this case, if a driver didn't finish the race, it will show "Null" in the "position" column. I simply removed these drivers thorught a filtering condition.


In [0]:
results_clean = results_df.join(avg_pit_time_df, ["raceId", "driverId"], how="inner").filter(col("position").cast("int").isNotNull()).withColumn("position", col("position").cast("int"))

windowSpec = Window.partitionBy("raceId").orderBy("position")
display(results_clean.select("raceId", "driverId", "position", "avg_pit_time").limit(20))

raceId,driverId,position,avg_pit_time
841,20,1,23319.5
841,1,2,23213.0
841,808,3,25109.0
841,4,4,24055.0
841,17,5,24058.67
841,18,6,20950.33
841,13,7,24145.67
841,67,8,24221.0
841,16,9,24924.5
841,814,10,24597.5


3. Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

- We can easily observe that the "code" column is origially from the "driverRef" column, where the value is set by the first 3 uppercase alphabets of corresponding string from the column "driverRef".

In [0]:
missing_code_df = drivers_df.filter((F.col("code") == "\\N")|(F.col("code").isNull()))
missing_code_df.select("driverId", "driverRef", "code")

DataFrame[driverId: int, driverRef: string, code: string]

In [0]:
drivers_df = drivers_df.withColumn(
    "code", 
    when(col("code")=="\\N", upper(substring(col("driverRef"), 1, 3))).otherwise(col("code"))
)
display(drivers_df.limit(10))

driverId,driverRef,number,code,forename,surname,dob,nationality,url
1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,\N,NAK,Kazuki,Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,\N,BOU,Sébastien,Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7,RAI,Kimi,Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88,KUB,Robert,Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,\N,GLO,Timo,Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


4. Who is the youngest and oldest driver for each race? Create a new column called “Age”

- In this case, Age is calculated by taking the difference in days between the race date and the driver's date of birth, then dividing it by 365.25 (In other to account for leap years) and casting into int number, we can get the number of full birthdays

In [0]:
drivers_df = drivers_df.withColumn("dob", to_date(col("dob")))
races_df = races_df.withColumn("date", to_date(col("date")))

results_with_dob = results_df.join(drivers_df.select("driverId", "dob"), on="driverId", how="inner").join(races_df.select("raceId", "date"), on="raceId", how="inner")

results_with_age = results_with_dob.withColumn("Age", (datediff(col("date"), col("dob"))/365.25).cast("int"))

age_window_youngest = Window.partitionBy("raceId").orderBy("Age")
age_window_oldest = Window.partitionBy("raceId").orderBy(col("Age").desc())

youngest = results_with_age.withColumn("row_num", row_number().over(age_window_youngest)).filter("row_num = 1")
oldest = results_with_age.withColumn("row_num", row_number().over(age_window_oldest)).filter("row_num = 1")

display(youngest.select("raceId", "driverId", "Age").limit(20))
display(oldest.select("raceId", "driverId", "Age").limit(20))

raceId,driverId,Age
1,67,20
2,67,20
3,67,20
4,67,20
5,67,20
6,67,20
7,67,20
8,67,20
9,67,20
10,153,19


raceId,driverId,Age
1,22,36
2,22,36
3,22,36
4,22,36
5,22,36
6,22,37
7,22,37
8,22,37
9,22,37
10,22,37


5. For a given race, which driver has the most wins and losses?

In [0]:
results_clean = results_df.filter(col("position").cast("int").isNotNull()).withColumn("position", col("position").cast("int"))
results_flagged = results_clean.withColumn("win", when(col("position") == 1, 1).otherwise(0)).withColumn("not_win_but_completed", when((col("position") > 1) & (col("statusId") == 1), 1).otherwise(0))

cum_window = Window.partitionBy("driverId").orderBy("raceId").rowsBetween(Window.unboundedPreceding, -1)

results_cum_stats = results_flagged.withColumn("cum_wins", sum("win").over(cum_window)).withColumn("cum_losses", sum("not_win_but_completed").over(cum_window))

win_window = Window.partitionBy("raceId").orderBy(col("cum_wins").desc())
loss_window = Window.partitionBy("raceId").orderBy(col("cum_losses").desc())

most_wins = results_cum_stats.withColumn("rank_win", row_number().over(win_window)).filter(col("rank_win") == 1).select("raceId", "driverId", "cum_wins")

most_losses = results_cum_stats.withColumn("rank_loss", row_number().over(loss_window)).filter(col("rank_loss") == 1).select("raceId", "driverId", "cum_losses")

display(most_wins.dropna().limit(20))
display(most_losses.dropna().limit(20))

raceId,driverId,cum_wins
2,18,1
3,18,2
4,18,2
5,18,3
6,18,4
7,18,5
8,18,6
9,18,6
10,18,6
11,18,6


raceId,driverId,cum_losses
2,2,1
3,2,2
4,2,3
5,10,4
6,22,5
7,3,5
8,3,6
9,3,7
10,3,8
11,3,9


6. Which constructor's overall performance is best in each year?

In [0]:
results_with_year = results_df.join(races_df.select("raceId", "year"), on="raceId", how="left")

constructor_points = results_with_year.groupBy("year", "constructorId").agg(sum("points").alias("total_points"))

constructor_points_named = constructor_points.join(
    constructors_df.select("constructorId", "name"), on="constructorId", how="left"
)

constructor_ranked = constructor_points_named.orderBy("year", col("total_points").desc())

display(constructor_ranked.limit(20))

constructorId,year,total_points,name
51,1950,89.0,Alfa Romeo
6,1950,21.0,Ferrari
154,1950,20.0,Talbot-Lago
113,1950,14.0,Kurtis Kraft
105,1950,11.0,Maserati
150,1950,10.0,Deidt
141,1950,3.0,Simca
110,1950,0.0,Lesovsky
156,1950,0.0,Marchese
160,1950,0.0,Wetteroth
