In [0]:
# Gina Wang
# Dr. Morales & Nana
# QMSSGR5069 Applied Data Sciences - Take Home Exercise #2
# March 24, 2025

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, round, concat_ws, trim, when, substring, upper, to_date, year, month, dayofmonth, min as spark_min, max as spark_max, count, lit
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql import functions as F

In [0]:
# Question 1: What was the average time each driver spent at the pit stop for each race?

# Create Spark session
spark = SparkSession.builder.appName("F1 Pit Stop Analysis").getOrCreate()

# Load the data
pit_stops_df = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)

# Step 1: Compute average pit stop time (in milliseconds) for each driver in each race
avg_pit_df = pit_stops_df.groupBy("raceId", "driverId") \
    .agg(round(avg("milliseconds"), 2).alias("avg_pitstop_time_ms"))

# Step 2: Prepare driver names
drivers_df = drivers_df.select("driverId", "forename", "surname")

# Step 3: Join average pit stop times with driver names
avg_pit_with_names = avg_pit_df.join(drivers_df, on="driverId", how="left") \
    .withColumn("driver_name", concat_ws(" ", trim(col("forename")), trim(col("surname")))) \
    .select("raceId", "driver_name", "avg_pitstop_time_ms") \
    .orderBy("raceId", "avg_pitstop_time_ms")

# Step 4: Show results
avg_pit_with_names.show(100, truncate=False)

In [0]:
# Question 2: Rank the average time spent at the pit stop in order of who won each race

from pyspark.sql.functions import avg, round, concat_ws, col, trim
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Load the data
results_df = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)
races_df = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True)

# Step 1: Average pit stop time per driver per race
avg_pit_df = (
    pit_stops_df.groupBy("raceId", "driverId")
    .agg(round(avg("milliseconds"), 2).alias("avg_pitstop_time_ms"))
)

# Step 2: Join with results to get finishing positions
joined_df = (
    avg_pit_df.join(
        results_df.select("raceId", "driverId", "positionOrder"),
        on=["raceId", "driverId"],
        how="left"
    )
)

# Step 3: Join with drivers to get full names
joined_df = (
    joined_df.join(
        drivers_df.select("driverId", "forename", "surname"),
        on="driverId",
        how="left"
    )
    .withColumn("driver_name", concat_ws(" ", trim(col("forename")), trim(col("surname"))))
)

# Step 4: Join with race name
joined_df = (
    joined_df.join(
        races_df.select("raceId", "name").withColumnRenamed("name", "race_name"),
        on="raceId",
        how="left"
    )
)

# Step 5: Window to rank drivers by position (1 = winner)
window_spec = Window.partitionBy("raceId").orderBy(col("positionOrder").asc_nulls_last())

# Step 6: Add finishing rank column
ranked_df = joined_df.withColumn("finishing_rank", rank().over(window_spec))

# Step 7: Final selection and sort
final_df = (
    ranked_df.select("raceId", "race_name", "driverId", "driver_name",
                     "avg_pitstop_time_ms", "positionOrder", "finishing_rank")
    .orderBy("raceId", "finishing_rank")
)

# Step 8: Show results
final_df.show(100, truncate=False)

In [0]:
# Question 3: Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

# Load the drivers dataset
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)

# Replace missing codes: when code is null or equals '\N', generate code from surname
drivers_df_filled = drivers_df.withColumn(
    "code",
    when(
        (col("code").isNull()) | (col("code") == "\\N"),
        upper(substring(col("surname"), 1, 3))
    ).otherwise(col("code"))
)

# Show results
drivers_df_filled.select("driverId", "forename", "surname", "code").show(100, truncate=False)

In [0]:
# Question 4: Who is the youngest and oldest driver for each race? Create a new column called “Age”?

# Load datasets
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)
results_df = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)
races_df = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True)

# Step 1: Convert string columns to date type
drivers_df = drivers_df.withColumn("dob", to_date(col("dob")))
races_df = races_df.withColumn("date", to_date(col("date")))

# Step 2: Join results with drivers and race info
joined_df = (
    results_df.join(drivers_df, on="driverId", how="left")
              .join(races_df.select("raceId", "date", "name"), on="raceId", how="left")
)

# Step 3: Calculate age of driver on race day
joined_df = joined_df.withColumn(
    "Age",
    when(
        (month(col("date")) > month(col("dob"))) |
        ((month(col("date")) == month(col("dob"))) & (dayofmonth(col("date")) >= dayofmonth(col("dob")))),
        year(col("date")) - year(col("dob"))
    ).otherwise(
        year(col("date")) - year(col("dob")) - 1
    )
)

# Step 4: Define window to get youngest and oldest per race
window_spec = Window.partitionBy("raceId")

# Step 5: Add youngest and oldest ages
result_df = (
    joined_df.withColumn("min_age", spark_min("Age").over(window_spec))
             .withColumn("max_age", spark_max("Age").over(window_spec))
)

# Step 6: Filter for youngest or oldest drivers only
extremes_df = result_df.filter(
    (col("Age") == col("min_age")) | (col("Age") == col("max_age"))
)

# Step 7: Label as "Youngest" or "Oldest"
extremes_df = extremes_df.withColumn(
    "Age_Category",
    when(col("Age") == col("min_age"), "Youngest").otherwise("Oldest")
)

# Step 8: Final selection
final_df = (
    extremes_df.select(
        "raceId", "name", "date", "driverId", "forename", "surname", "Age", "Age_Category"
    ).orderBy("raceId", "Age_Category")
)

# Step 9: Show results
final_df.show(100, truncate=False)

In [0]:
# Question 5: For a given race, which driver has the most wins and losses?

# Load data
results_df = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)
status_df = spark.read.csv("s3://columbia-gr5069-main/raw/status.csv", header=True, inferSchema=True)
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)

# Step 1: Get statusIds for DNF (not "Finished")
dnf_status_ids = (
    status_df.filter(col("status") != "Finished")
             .select("statusId")
             .rdd.flatMap(lambda x: x)
             .collect()
)

# Step 2: Choose a specific raceId (you can change this)
target_race_id = 841

# Step 3: Filter for races before that race
previous_races_df = results_df.filter(col("raceId") < target_race_id)

# Step 4: Add labeled columns for win, DNF, completed-not-won, and total participation
labeled_df = (
    previous_races_df.withColumn("win", when(col("positionOrder") == 1, 1).otherwise(0))
                     .withColumn("not_completed", when(col("statusId").isin(dnf_status_ids), 1).otherwise(0))
                     .withColumn(
                         "completed_not_won",
                         when((col("positionOrder") > 1) & (~col("statusId").isin(dnf_status_ids)), 1).otherwise(0)
                     )
                     .withColumn("total_participated", lit(1))
)

# Step 5: Aggregate stats by driverId
summary_df = (
    labeled_df.groupBy("driverId")
              .agg(
                  count(when(col("win") == 1, True)).alias("wins"),
                  count(when(col("completed_not_won") == 1, True)).alias("completed_not_won"),
                  count(when(col("not_completed") == 1, True)).alias("not_completed"),
                  count(col("total_participated")).alias("total_races")
              )
)

# Step 6: Join with driver names
final_df = (
    summary_df.join(drivers_df.select("driverId", "forename", "surname"), on="driverId", how="left")
              .select("driverId", "forename", "surname", "wins", "completed_not_won", "not_completed", "total_races")
              .orderBy(col("wins").desc())
)

# Step 7: Show result
final_df.show(100, truncate=False)

In [0]:
# Question 6: Continue exploring the data by answering your own question.

# My question is "Which driver had the fastest average pit stop time in the 2021 season?"

# Load datasets
races_df = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True)
pit_stops_df = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)

# Step 1: Filter races for 2021 season
races_2021 = (
    races_df.withColumn("date", col("date").cast("date"))
            .filter(year(col("date")) == 2021)
            .select("raceId", "name", "date")
)

# Step 2: Join races with pit stops to get only 2021 pit stops
pit_stops_2021 = pit_stops_df.join(races_2021, on="raceId", how="inner")

# Step 3: Calculate average pit stop time per driver
avg_pitstop_by_driver = (
    pit_stops_2021.groupBy("driverId")
                  .agg(round(avg("milliseconds"), 2).alias("avg_pitstop_time_ms"))
)

# Step 4: Add driver names
final_df = (
    avg_pitstop_by_driver.join(drivers_df.select("driverId", "forename", "surname"), on="driverId", how="left")
                         .withColumn("driver_name", concat_ws(" ", trim(col("forename")), trim(col("surname"))))
                         .select("driverId", "driver_name", "avg_pitstop_time_ms")
                         .orderBy("avg_pitstop_time_ms")
)

# Step 5: Show result
final_df.show(50, truncate=False)
