In [0]:
# ========================================================
# F1 Data Analysis Assignment using PySpark on Databricks
# Data is loaded from AWS S3: s3://columbia-gr5069-main/raw/
# ========================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when, lit, row_number, to_date, datediff
from pyspark.sql.window import Window

# In Databricks, the Spark session is usually already initialized
spark = SparkSession.builder.appName("F1_Assignment").getOrCreate()

# -------------------------------
# 1. Data Loading (from AWS S3)
# -------------------------------
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True, inferSchema=True)
df_drivers  = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True, inferSchema=True)
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True, inferSchema=True)
df_results  = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True, inferSchema=True)
df_races    = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True, inferSchema=True)

print("Data loaded successfully!")

# -------------------------------
# Q1: Calculate the average pit stop time (in milliseconds) for each driver in each race [10 pts]
# -------------------------------
q1_avg_pit = df_pitstops.groupBy("raceId", "driverId") \
    .agg(avg("milliseconds").alias("avg_pit_time"))

print("Q1: Average pit stop time (in ms) for each driver in each race:")
q1_avg_pit.orderBy("raceId", "driverId").show(5)

# -------------------------------
# Q2: Rank the average pit stop time per race and mark the race winner [20 pts]
# -------------------------------
# Filter df_results to select race winners (positionOrder == 1)
winners = df_results.filter(col("positionOrder") == 1) \
    .select("raceId", "driverId") \
    .withColumnRenamed("driverId", "winner_driverId")

# Join the average pit time with the winners data on raceId and mark if the driver is the winner
q2 = q1_avg_pit.join(winners, on="raceId", how="inner")
q2 = q2.withColumn("is_winner", when(col("driverId") == col("winner_driverId"), lit(1)).otherwise(lit(0)))

# Use a window function to rank drivers within each race by average pit stop time
window_spec = Window.partitionBy("raceId").orderBy("avg_pit_time")
q2_ranked = q2.withColumn("pit_rank", row_number().over(window_spec))

print("Q2: Ranking of average pit stop times per race and marking winners:")
q2_ranked.select("raceId", "driverId", "avg_pit_time", "pit_rank", "is_winner") \
    .orderBy("raceId", "pit_rank").show(5)

# -------------------------------
# Q3: Fill in missing driver codes in the drivers dataset (e.g., fill 'ALO' for Alonso) [20 pts]
# -------------------------------
df_drivers_filled = df_drivers.withColumn(
    "code",
    when(col("code").isNull() & (col("surname") == "Alonso"), lit("ALO"))
    .when(col("code").isNull() & (col("surname") == "Hamilton"), lit("HAM"))
    .when(col("code").isNull() & (col("surname") == "Vettel"), lit("VET"))
    .otherwise(col("code"))
)

print("Q3: Driver codes after filling missing values:")
df_drivers_filled.select("driverId", "forename", "surname", "code").show(5)

# -------------------------------
# Q4: For each race, determine the youngest and oldest driver by creating a new column "Age" [20 pts]
# -------------------------------
# Convert race date to date type
races_date = df_races.select("raceId", to_date(col("date"), "yyyy-MM-dd").alias("race_date"))
# Convert driver's date of birth to date type
drivers_date = df_drivers_filled.select("driverId", to_date(col("dob"), "yyyy-MM-dd").alias("dob"))

# Join df_results, races_date, and drivers_date to get the race date and driver's DOB
race_driver = df_results.join(races_date, "raceId").join(drivers_date, "driverId")
# Calculate the driver's age at the time of the race (in years)
race_driver = race_driver.withColumn("Age", datediff(col("race_date"), col("dob")) / 365.25)

# Find the youngest driver in each race using a window function
window_youngest = Window.partitionBy("raceId").orderBy("Age")
youngest = race_driver.withColumn("rn", row_number().over(window_youngest)).filter(col("rn") == 1)

# Find the oldest driver in each race using a window function
window_oldest = Window.partitionBy("raceId").orderBy(col("Age").desc())
oldest = race_driver.withColumn("rn", row_number().over(window_oldest)).filter(col("rn") == 1)

print("Q4: Youngest driver in each race (with Age):")
youngest.select("raceId", "driverId", "Age").orderBy("raceId").show(5)
print("Q4: Oldest driver in each race (with Age):")
oldest.select("raceId", "driverId", "Age").orderBy("raceId").show(5)

# -------------------------------
# Q5: For each race, which driver has the most wins and losses? [20 pts]
# -------------------------------
# Count wins for each driver in each race (win = positionOrder == 1)
wins = df_results.filter(col("positionOrder") == 1) \
    .groupBy("raceId", "driverId") \
    .count() \
    .withColumnRenamed("count", "wins")

# Count losses for each driver in each race (loss = positionOrder > 1)
losses = df_results.filter(col("positionOrder") > 1) \
    .groupBy("raceId", "driverId") \
    .count() \
    .withColumnRenamed("count", "losses")

# Join wins and losses data; fill missing values with 0
win_loss = wins.join(losses, on=["raceId", "driverId"], how="outer").na.fill(0)

# Determine the driver with the most wins in each race using a window function
window_win = Window.partitionBy("raceId").orderBy(col("wins").desc())
most_wins = win_loss.withColumn("win_rank", row_number().over(window_win)).filter(col("win_rank") == 1)

# Determine the driver with the most losses in each race using a window function
window_loss = Window.partitionBy("raceId").orderBy(col("losses").desc())
most_losses = win_loss.withColumn("loss_rank", row_number().over(window_loss)).filter(col("loss_rank") == 1)

print("Q5: Driver with the most wins in each race:")
most_wins.select("raceId", "driverId", "wins").orderBy("raceId").show(5)
print("Q5: Driver with the most losses in each race:")
most_losses.select("raceId", "driverId", "losses").orderBy("raceId").show(5)

# -------------------------------
# Q6: Custom Question – For example: Calculate the average starting grid position for each driver,
# where a lower value indicates a better starting position [10 pts]
# -------------------------------
avg_grid = df_results.groupBy("driverId") \
    .agg(avg("grid").alias("avg_grid_position")) \
    .orderBy("avg_grid_position")

print("Q6: Average starting grid position for each driver (lower is better):")
avg_grid.show(10)

# -------------------------------
# Final Instructions:
# Save this Notebook and commit it to your GitHub classroom repository.
# It is recommended to commit each question separately to maintain a clear commit history.
# -------------------------------

print("F1 Data Analysis Assignment completed!")


Data loaded successfully!
Q1: Average pit stop time (in ms) for each driver in each race:
+------+--------+------------+
|raceId|driverId|avg_pit_time|
+------+--------+------------+
|   841|       1|     23213.0|
|   841|       2|     24046.0|
|   841|       3|     23716.0|
|   841|       4|     24055.0|
|   841|       5|     24865.0|
+------+--------+------------+
only showing top 5 rows

Q2: Ranking of average pit stop times per race and marking winners:
+------+--------+------------------+--------+---------+
|raceId|driverId|      avg_pit_time|pit_rank|is_winner|
+------+--------+------------------+--------+---------+
|   841|      18|20950.333333333332|       1|        0|
|   841|       1|           23213.0|       2|        0|
|   841|      20|           23319.5|       3|        1|
|   841|     815|           23438.0|       4|        0|
|   841|       3|           23716.0|       5|        0|
+------+--------+------------------+--------+---------+
only showing top 5 rows

Q3: Drive