In [0]:
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import expr, col, coalesce, upper
from pyspark.sql.functions import col, to_date, datediff, floor, row_number, count, when

In [0]:
df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv',header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_status = spark.read.csv('s3://columbia-gr5069-main/raw/status.csv', header=True, inferSchema=True)


In [0]:
# Q1 What was the average time each driver spent at the pit stop for each race?
df_pit_stops = df_pit_stops.withColumn("milliseconds", col("milliseconds").cast("double"))

avg_pit_stop_time = df_pit_stops.groupBy("raceId", "driverId").agg(avg("milliseconds").alias("avg_pit_stop_time"))

display(avg_pit_stop_time)

In [0]:
# Q2 Rank the average time spent at the pit stop in order of who won each race
# I filter out those drivers who did not finish the race

# Join avg_pit_stop_time with df_results to get finishing order
df_joined = avg_pit_stop_time.join(df_results, on=["raceId", "driverId"])

# Define window specification to rank based on finishing order
window_spec = Window.partitionBy("raceId").orderBy("positionOrder")

# Rank the average pit stop times based on finishing order
df_ranked = df_joined.withColumn("rank", rank().over(window_spec))
df_ranked = df_ranked.filter(df_ranked["positionOrder"].isNotNull())

display(df_ranked.select("raceId", "driverId", "avg_pit_stop_time", "positionOrder", "rank"))

In [0]:
# Q3: Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

# Replace '\N' with None 
df_drivers_clean = df_drivers.replace('\\N', None)

# Filter for drivers with missing code
df_missing_codes = df_drivers_clean.filter(col("code").isNull())

# Create a new code using the first 3 letters of the surname
df_missing_codes = df_missing_codes.withColumn("code", upper(expr("substring(surname, 1, 3)")))

# Join back to update the original DataFrame
df_drivers_updated = df_drivers_clean.join(
    df_missing_codes.select("driverId", "code").withColumnRenamed("code", "new_code"),
    on="driverId", how="left"
)

# Use coalesce to keep original code if it exists, or use the new one
df_drivers_updated = df_drivers_updated.withColumn(
    "code", coalesce(col("code"), col("new_code"))
).drop("new_code")

# Display the result
display(df_drivers_updated)

In [0]:
# Q4: Who is the youngest and oldest driver for each race? Create a new column called “Age”

# Clean and convert date fields
df_drivers = df_drivers.replace('\\N', None).withColumn("dob", to_date("dob"))
df_races = df_races.withColumn("date", to_date("date"))

# Join drivers, results, and races
df_all = df_results.select("raceId", "driverId") \
    .join(df_races.select("raceId", "date", "name"), on="raceId", how="left") \
    .join(df_drivers.select("driverId", "surname", "dob"), on="driverId", how="left")

# Calculate Age on race day
df_all = df_all.withColumn("Age", floor(datediff(col("date"), col("dob")) / 365.25))

#  Use window functions to rank by Age within each race
window_young = Window.partitionBy("raceId").orderBy(col("Age").asc())
window_old = Window.partitionBy("raceId").orderBy(col("Age").desc())

# Find youngest driver per race
df_youngest = df_all.withColumn("rank_young", row_number().over(window_young)) \
                    .filter(col("rank_young") == 1) \
                    .select("raceId", "name", "surname", "Age") \
                    .withColumnRenamed("surname", "Youngest") \
                    .withColumnRenamed("Age", "Youngest_Age")

#  Find oldest driver per race
df_oldest = df_all.withColumn("rank_old", row_number().over(window_old)) \
                  .filter(col("rank_old") == 1) \
                  .select("raceId", "surname", "Age") \
                  .withColumnRenamed("surname", "Oldest") \
                  .withColumnRenamed("Age", "Oldest_Age")

# Combine youngest + oldest results
df_age_extremes = df_youngest.join(df_oldest, on="raceId")

#  Display the result
display(df_age_extremes)

In [0]:
# Q4: For a given race, which driver has the most wins and losses? Race 100/DriverID 4

# Clean and prepare race date
df_races = df_races.withColumn("date", to_date(col("date")))

# Join race and status info
df_all = df_results.join(df_races.select("raceId", "date"), on="raceId", how="left") \
                   .join(df_status, on="statusId", how="left")

# Define window ordered by race date for each driver
history_window = Window.partitionBy("driverId").orderBy("date").rowsBetween(Window.unboundedPreceding, -1)

# Calculate previous wins and completed (non-win) races
df_all = df_all.withColumn("prev_wins", count(when(col("positionOrder") == 1, 1)).over(history_window)) \
               .withColumn("prev_losses", count(
                   when((col("positionOrder") > 1) & (col("status") == "Finished"), 1)
               ).over(history_window))

# Filter to Race 100 and Driver ID 4
df_final = df_all.filter((col("raceId") == 100) & (col("driverId") == 4))

# Add name (optional)
df_final = df_final.join(df_drivers.select("driverId", "surname"), on="driverId", how="left")

# Display the result
df_final.select("driverId", "surname", "prev_wins", "prev_losses").show()


In [0]:
# Q5: Which driver had the most race completions (finishes) in each season?


# Join results with status and races
df = df_results.join(df_status, on="statusId", how="left") \
               .join(df_races.select("raceId", "year"), on="raceId", how="left")

# Filter only finished races
df_finished = df.filter(col("status") == "Finished")

# Count finishes per driver per year
df_finishes = df_finished.groupBy("year", "driverId") \
                         .agg(count("*").alias("num_finishes"))

# Get driver with most finishes per year
window = Window.partitionBy("year").orderBy(col("num_finishes").desc())
df_top = df_finishes.withColumn("rank", row_number().over(window)) \
                    .filter(col("rank") == 1)

# Add driver names
df_top_named = df_top.join(df_drivers.select("driverId", "surname"), on="driverId", how="left") \
                     .select("year", "surname", "num_finishes")

# Display
display(df_top_named.orderBy("year"))
