In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when, count, floor, datediff, min, max
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
# load related dataset from S3
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True)
df_sprint_results = spark.read.csv('s3://columbia-gr5069-main/raw/sprint_results.csv', header=True)


1. [10 pts] What was the average time each driver spent at the pit stop for each race?


In [0]:
avg_pit_stop = df_pitstops.groupBy("raceId", "driverId").agg(avg("milliseconds").alias("avg_pit_time"))
display(avg_pit_stop)

raceId,driverId,avg_pit_time
843,39,26049.0
844,67,21976.33333333333
844,2,21743.75
844,20,20402.25
849,17,24416.0
856,20,20077.5
869,819,18606.0
873,818,30198.5
876,20,20707.0
878,3,21346.0


2. [20 pts] Rank the average time spent at the pit stop in order of who won each race

In [0]:
# How you decided to deal with drivers who did not finish the race: 
# Drivers who do not finish the race typically have a NULL value in the position column of the results dataset.
# I chose to exclude all drivers with NULL position by using isNotNull:
# Filter out drivers who did not finish (no position)
results_filtered = df_results.filter(col("position").isNotNull()) \
    .select("raceId", "driverId", "position")
# Join with average pit stop times
joined_df = avg_pit_stop.join(results_filtered, on=["raceId", "driverId"], how="inner")
# Add driver name and race name from other datasets
driver_names = df_drivers.select("driverId", "forename", "surname")
race_names = df_races.select("raceId", "name")
# Select columns and order by position joined result
ranked_avg_pitstops = joined_df \
    .join(driver_names, "driverId", "left") \
    .join(race_names, "raceId", "left") \
    .select("raceId", "name", "driverId", "forename", "surname", "position", "avg_pit_time") \
    .orderBy("raceId", F.col("position").cast("int"))
display(ranked_avg_pitstops)

raceId,name,driverId,forename,surname,position,avg_pit_time
1000,Hungarian Grand Prix,838,Stoffel,Vandoorne,\N,21732.0
1000,Hungarian Grand Prix,1,Lewis,Hamilton,1,21480.0
1000,Hungarian Grand Prix,20,Sebastian,Vettel,2,23111.0
1000,Hungarian Grand Prix,8,Kimi,Räikkönen,3,23150.0
1000,Hungarian Grand Prix,817,Daniel,Ricciardo,4,21364.0
1000,Hungarian Grand Prix,822,Valtteri,Bottas,5,21337.0
1000,Hungarian Grand Prix,842,Pierre,Gasly,6,21684.0
1000,Hungarian Grand Prix,825,Kevin,Magnussen,7,25126.0
1000,Hungarian Grand Prix,4,Fernando,Alonso,8,21795.0
1000,Hungarian Grand Prix,832,Carlos,Sainz,9,21914.0


3. [20 pts] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
# If the 'code' is missing, I take the first three letters of the driver's surname
# Example: Alonso → "ALO", Schumacher → "SCH"
# This approach mimics the format of real F1 codes and ensures every driver has a non-null value.
driver_code_filled = df_drivers.withColumn(
    "code", when(col("code").isNull(), col("surname").substr(1,3)).otherwise(col("code")))
display(driver_code_filled)

driverId,driverRef,number,code,forename,surname,dob,nationality,url
1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,\N,NAK,Kazuki,Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,\N,BOU,Sébastien,Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7,RAI,Kimi,Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88,KUB,Robert,Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,\N,GLO,Timo,Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


4. [20 pts] Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
# Join drivers and races to compute Age at race date
# Age = number of full years (birthdays) the driver has had by race day
# use floor(), datediff(race_date, dob) / 365 to computes the total days lived, 
# converts it to years, and rounds down to the most recent full year.
df_with_age = df_results.join(df_drivers, "driverId", "inner") \
    .join(df_races.select("raceId", "date"), "raceId", "left") \
    .withColumn("Age", floor(datediff(col("date"), col("dob")) / 365))

# Get min and max Age per race
age_extremes = df_with_age.groupBy("raceId").agg(
    min("Age").alias("youngest_age"),
    max("Age").alias("oldest_age")
)
# Join back to find youngest drivers
youngest = df_with_age.join(age_extremes.withColumnRenamed("raceId", "raceId_ext"), 
                            (df_with_age.raceId == col("raceId_ext")) & 
                            (df_with_age.Age == col("youngest_age")), "inner") \
                      .select(df_with_age["raceId"], "driverId", "forename", "surname", "Age") \
                      .distinct()
# Join back to find oldest drivers
oldest = df_with_age.join(age_extremes.withColumnRenamed("raceId", "raceId_ext"), 
                          (df_with_age.raceId == col("raceId_ext")) & 
                          (df_with_age.Age == col("oldest_age")), "inner") \
                    .select(df_with_age["raceId"], "driverId", "forename", "surname", "Age") \
                    .distinct()
# Display results - youngest and oldest drivers
display(youngest.orderBy("raceId"))
display(oldest.orderBy("raceId"))

raceId,driverId,forename,surname,Age
1,67,Sébastien,Buemi,20
10,153,Jaime,Alguersuari,19
100,32,Christian,Klien,21
1000,840,Lance,Stroll,19
1001,840,Lance,Stroll,19
1002,840,Lance,Stroll,19
1003,840,Lance,Stroll,19
1004,840,Lance,Stroll,19
1005,840,Lance,Stroll,19
1006,840,Lance,Stroll,19


raceId,driverId,forename,surname,Age
1,22,Rubens,Barrichello,36
1,21,Giancarlo,Fisichella,36
10,22,Rubens,Barrichello,37
100,44,Olivier,Panis,37
1000,8,Kimi,Räikkönen,38
1001,8,Kimi,Räikkönen,38
1002,8,Kimi,Räikkönen,38
1003,8,Kimi,Räikkönen,38
1004,8,Kimi,Räikkönen,38
1005,8,Kimi,Räikkönen,39


5. [20 pts] For a given race, which driver has the most wins and losses?

In [0]:
# Join race dates into results
results_with_dates = df_results.join(df_races.select("raceId", "date"), on="raceId", how="inner") \
                               .select("raceId", "driverId", "positionOrder", "date")

# Define time-aware window per driver by using a window specification
window_spec = Window.partitionBy("driverId").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate cumulative wins and losses up to each race
driver_stats = results_with_dates.withColumn(
    "Cumulative_Wins",
    F.sum(F.when(F.col("positionOrder") == 1, 1).otherwise(0)).over(window_spec)
).withColumn(
    "Cumulative_Losses",
    F.sum(F.when((F.col("positionOrder") != 1) & (F.col("positionOrder").isNotNull()), 1).otherwise(0)).over(window_spec)
)
# Get the driver with the most cumulative wins per race
driver_stats_alias = driver_stats.alias("ds")
most_wins_per_race = driver_stats.groupBy("raceId").agg(
    F.max("Cumulative_Wins").alias("Max_Wins")
).alias("mw").join(driver_stats_alias,
                   (driver_stats_alias["Cumulative_Wins"] == F.col("mw.Max_Wins")) & 
                   (driver_stats_alias["raceId"] == F.col("mw.raceId"))
).select("ds.raceId", "ds.driverId", "ds.Cumulative_Wins", "ds.Cumulative_Losses")
# Get the driver with the most cumulative losses per race
most_losses_per_race = driver_stats.groupBy("raceId").agg(
    F.max("Cumulative_Losses").alias("Max_Losses")
).alias("ml").join(driver_stats_alias,
                   (driver_stats_alias["Cumulative_Losses"] == F.col("ml.Max_Losses")) & 
                   (driver_stats_alias["raceId"] == F.col("ml.raceId"))
).select("ds.raceId", "ds.driverId", "ds.Cumulative_Wins", "ds.Cumulative_Losses")
# Display results - most wins and losses per race
display(most_wins_per_race.orderBy("raceId"))
display(most_losses_per_race.orderBy("raceId"))

raceId,driverId,Cumulative_Wins,Cumulative_Losses
1,4,21,103
10,4,21,112
100,30,80,126
1000,1,67,153
1001,1,67,154
1002,1,68,154
1003,1,69,154
1004,1,70,154
1005,1,71,154
1006,1,71,155


raceId,driverId,Cumulative_Wins,Cumulative_Losses
1,22,9,263
10,22,9,272
100,22,7,185
1000,4,32,273
1001,4,32,274
1002,4,32,275
1003,4,32,276
1004,4,32,277
1005,4,32,278
1006,4,32,279


6. [10 pts] Continue exploring the data by answering your own question.

In [0]:
# My question: Which driver has completed the most races in their career?

# Filter for races the driver actually finished (has a position)
# This removes Did Not Finish and disqualified entries from the dataset by using isNonNull()
completed_races = df_results.filter(col("position").isNotNull())
# Count how many races each driver has completed by using count()
race_counts = completed_races.groupBy("driverId").agg(count("raceId").alias("races_completed"))
# Join with driver names
driver_names = df_drivers.select("driverId", "forename", "surname")
most_active_drivers = race_counts.join(driver_names, "driverId") \
    .orderBy(col("races_completed").desc())
# Display the top 10 drivers who have completed the most races
display(most_active_drivers.limit(10))

driverId,races_completed,forename,surname
4,370,Fernando,Alonso
8,352,Kimi,Räikkönen
22,326,Rubens,Barrichello
1,322,Lewis,Hamilton
18,309,Jenson,Button
30,308,Michael,Schumacher
20,300,Sebastian,Vettel
13,271,Felipe,Massa
119,257,Riccardo,Patrese
15,256,Jarno,Trulli


**ANSWER:** Based on the output above, the driver who has completed the most races is **Fernando Alonso**, with a total of **370 races completed**.