In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when, substring, upper, floor, months_between, to_date, row_number, sum, min
from pyspark.sql.window import Window

drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True)
results = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True)
races = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True)
pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)

#### Question 1: What was the average time each driver spent at the pit stop for each race?

In [0]:
# The average time that each driver spent at the pit stop for each race
avg_pit_time = pit_stops.groupBy("raceId", "driverId").agg(avg("milliseconds").alias("avg_pit_time_ms"))
display(avg_pit_time)

#### Question 2: Rank the average time spent at the pit stop in order of who won each race

In [0]:
# Join with results to get finishing position
results_filtered = results.filter(col("positionOrder").isNotNull())
joined = avg_pit_time.join(
    results_filtered.select("raceId", "driverId", "positionOrder"),
    on=["raceId", "driverId"],
    how="inner"
)

# Rank the average pit stop times based on the drivers' finishing order (positionOrder)
ranked_pit_stops = joined.orderBy("raceId", "positionOrder")

display(ranked_pit_stops)

# How I decided to deal with drivers who did not finish the race: To ensure fairness and meaningful comparisons, I excluded drivers who did not finish the race, which we identified as having null values in the positionOrder column.

In [0]:
# Below is only for the winners(positionOrder = 1) from each race
winners = results.filter(col("positionOrder") == 1).select("raceId", "driverId").withColumnRenamed("driverId", "winner_driverId")

joined = avg_pit_time.join(winners, "raceId")
winner_pit_times = joined.filter(col("driverId") == col("winner_driverId")).orderBy("avg_pit_time_ms")

display(winner_pit_times)

#### Question 3: Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
# If the original code is \N, we take the first 3 letters of the surname (uppercased)
drivers = drivers.withColumn(
    "code",
    when(col("code") == "\\N", upper(substring(col("surname"), 1, 3)))
    .otherwise(col("code"))
)

display(drivers)

#### Question 4: Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
# Join races to results table, and drivers, to get full context
race_results = results.join(races.select("raceId", "date"), on="raceId")
race_results = race_results.join(drivers.select("driverId", "dob", "forename", "surname"), on="driverId")

# Counts how many birthdays each driver has had in their lives
race_results = race_results.withColumn(
    "Age",
    floor(months_between(col("date"), col("dob")) / 12)
)

# Use window functions to find youngest and oldest driver per race
window_young = Window.partitionBy("raceId").orderBy(col("Age").asc())
window_old = Window.partitionBy("raceId").orderBy(col("Age").desc())
youngest = race_results.withColumn("rn", row_number().over(window_young)).filter("rn = 1")
oldest = race_results.withColumn("rn", row_number().over(window_old)).filter("rn = 1")

# the oldest and youngest driver for each race
display(youngest.select("raceId", "forename", "surname", "Age"))
display(oldest.select("raceId", "forename", "surname", "Age"))

#### Question 5: For a given race, which driver has the most wins and losses?

In [0]:
# Join race data into results
results_with_race_info = results.join(races.select("raceId", "year", "round"), on="raceId")

# Create alias for self-join on prior races
past_races = results_with_race_info.alias("past")
current_races = results_with_race_info.alias("curr")

# Join all current races with all past races where past year/round < current year/round
joined = current_races.join(
    past_races,
    (col("past.driverId") == col("curr.driverId")) &
    (
        (col("past.year") < col("curr.year")) |
        ((col("past.year") == col("curr.year")) & (col("past.round") < col("curr.round")))
    ),
    how="left"
)

# Add columns to tag past wins and past finishes (non-wins)
joined = joined.withColumn("past_win", (col("past.positionOrder") == 1).cast("int"))
joined = joined.withColumn("past_loss", ((col("past.positionOrder") != 1) & (col("past.statusId") == 1)).cast("int"))

# Group by current race and driver, summing past wins and losses
driver_history_per_race = joined.groupBy("curr.raceId", "curr.driverId").agg(
    sum("past_win").alias("past_wins"),
    sum("past_loss").alias("past_losses")
)

driver_history_per_race = driver_history_per_race.orderBy("raceId", col("past_wins").desc())

display(driver_history_per_race)

In [0]:
# Create windows to rank within each race
wins_window = Window.partitionBy("raceId").orderBy(col("past_wins").desc())
losses_window = Window.partitionBy("raceId").orderBy(col("past_losses").desc())

# Rank drivers by past_wins and past_losses per race
ranked_wins = driver_history_per_race.withColumn("win_rank", row_number().over(wins_window))
ranked_losses = driver_history_per_race.withColumn("loss_rank", row_number().over(losses_window))

# Get top 1 driver with most wins and losses per race
most_wins_per_race = ranked_wins.filter(col("win_rank") == 1).select("raceId", "driverId", "past_wins")
most_losses_per_race = ranked_losses.filter(col("loss_rank") == 1).select("raceId", "driverId", "past_losses")

print("Driver with most wins before each race:")
display(most_wins_per_race)

print("Driver with most losses before each race:")
display(most_losses_per_race)

#### Question 6: Continue exploring the data by answering your own question. What was each driver's age when they achieved their best personal performance (shortest race time)?

In [0]:
# Fastest race time per driver
results_filtered = results.filter(col("milliseconds").isNotNull())
best_times = results_filtered.groupBy("driverId").agg(min("milliseconds").alias("best_time"))

# Join to get race where best time happened — use aliases
res = results_filtered.alias("res")
bt = best_times.alias("bt")

best_results = res.join(
    bt,
    (col("res.driverId") == col("bt.driverId")) &
    (col("res.milliseconds") == col("bt.best_time")),
    how="inner"
).select(
    col("res.driverId"),
    col("res.raceId"),
    col("res.milliseconds")
)

# If multiple races have same best time, keep only the earliest one
window = Window.partitionBy("driverId").orderBy("raceId")
best_results = best_results.withColumn("rn", row_number().over(window)).filter("rn = 1").drop("rn")

# Join with race_results to get "Age", but need to use filtered_results
race_results = results_filtered.join(races.select("raceId", "date"), on="raceId")
race_results = race_results.join(drivers.select("driverId", "dob", "forename", "surname"), on="driverId")

# Counts how many birthdays each driver has had in their lives
race_results = race_results.withColumn(
    "Age",
    floor(months_between(col("date"), col("dob")) / 12)
)
age_df = race_results.alias("age_df")

best_with_age = best_results.alias("br").join(
    age_df.select("driverId", "raceId", "Age"),
    on=["driverId", "raceId"],
    how="left"
)
best_with_age = best_with_age.filter(
    (col("milliseconds").isNotNull()) & (col("milliseconds") != "\\N")
)
display(best_with_age)