In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
import pandas as pd
from datetime import datetime

In [0]:
df_races= spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)

In [0]:
# Question 1: Average Pit Stop Time by Driver for Each Race
# Calculate average pit stop time for each driver in each race
avg_pit_stop_times = df_pit_stops.groupBy("raceId", "driverId") \
    .agg(avg("duration").alias("avg_pit_stop_time")) \
    .join(df_races, "raceId") \
    .join(df_drivers, "driverId") \
    .select(
        col("raceId"), 
        col("name").alias("race_name"), 
        col("year"), 
        col("driverId"), 
        concat_ws(" ", col("forename"), col("surname")).alias("driver_name"), 
        col("avg_pit_stop_time")
    ) \
    .orderBy("year", "raceId", "avg_pit_stop_time")

display(avg_pit_stop_times)

raceId,race_name,year,driverId,driver_name,avg_pit_stop_time
841,Australian Grand Prix,2011,18,Jenson Button,20.950333333333333
841,Australian Grand Prix,2011,1,Lewis Hamilton,23.213
841,Australian Grand Prix,2011,20,Sebastian Vettel,23.3195
841,Australian Grand Prix,2011,815,Sergio Pérez,23.438
841,Australian Grand Prix,2011,3,Nico Rosberg,23.716
841,Australian Grand Prix,2011,10,Timo Glock,23.792
841,Australian Grand Prix,2011,2,Nick Heidfeld,24.046
841,Australian Grand Prix,2011,4,Fernando Alonso,24.055000000000003
841,Australian Grand Prix,2011,17,Mark Webber,24.058666666666667
841,Australian Grand Prix,2011,155,Kamui Kobayashi,24.128


In [0]:
# Question 2: Rank Average Pit Stop Time by Race Winners
# Get race winners
race_winners = df_results.filter(col("position") == 1) \
    .select("raceId", "driverId")

# Join with average pit stop times and rank
pit_stop_ranking = avg_pit_stop_times \
    .join(race_winners, ["raceId", "driverId"]) \
    .select(
        "raceId", 
        "race_name", 
        "year", 
        "driver_name", 
        "avg_pit_stop_time"
    ) \
    .orderBy("year", "raceId", "avg_pit_stop_time")

display(pit_stop_ranking)

raceId,race_name,year,driver_name,avg_pit_stop_time
841,Australian Grand Prix,2011,Sebastian Vettel,23.3195
842,Malaysian Grand Prix,2011,Sebastian Vettel,22.40833333333333
843,Chinese Grand Prix,2011,Lewis Hamilton,20.659333333333333
844,Turkish Grand Prix,2011,Sebastian Vettel,20.40225
845,Spanish Grand Prix,2011,Sebastian Vettel,20.2145
846,Monaco Grand Prix,2011,Sebastian Vettel,28.536
847,Canadian Grand Prix,2011,Jenson Button,23.511166666666668
848,European Grand Prix,2011,Sebastian Vettel,20.59266666666667
849,British Grand Prix,2011,Fernando Alonso,24.671333333333333
850,German Grand Prix,2011,Lewis Hamilton,20.627


In [0]:
# Question 3: Insert Missing Driver Codes
drivers_missing_code = df_drivers.filter(col("code").isNull())

# Create a function to generate code from driver name (typically first 3 letters of surname)
def generate_code(forename, surname):
    if surname and len(surname) >= 3:
        return surname[:3].upper()
    elif forename and len(forename) >= 3:
        return forename[:3].upper()
    else:
        return "UNK"  # Unknown

# Register UDF
generate_code_udf = udf(generate_code)

# Update drivers with missing codes
updated_drivers = df_drivers.withColumn(
    "code",
    when(col("code").isNull(), 
         generate_code_udf(col("forename"), col("surname"))
    ).otherwise(col("code"))
)

display(updated_drivers)


driverId,driverRef,number,code,forename,surname,dob,nationality,url
1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,\N,NAK,Kazuki,Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,\N,BOU,Sébastien,Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7,RAI,Kimi,Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88,KUB,Robert,Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,\N,GLO,Timo,Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


In [0]:
#Question 4: Youngest and Oldest Driver for Each Race
# Calculate driver age for each race
races_with_date = df_races.select("raceId", "year", to_date(col("date")).alias("race_date"))
drivers_with_dob = df_drivers.select("driverId", "forename", "surname", to_date(col("dob")).alias("birth_date"))

# Join datasets and calculate age
drivers_age = df_results.select("raceId", "driverId") \
    .join(races_with_date, "raceId") \
    .join(drivers_with_dob, "driverId") \
    .withColumn("driver_name", concat_ws(" ", col("forename"), col("surname"))) \
    .withColumn("age", floor(datediff(col("race_date"), col("birth_date")) / 365.25))

# Find youngest and oldest driver for each race
window_youngest = Window.partitionBy("raceId").orderBy("age")
window_oldest = Window.partitionBy("raceId").orderBy(desc("age"))

youngest_oldest_drivers = drivers_age \
    .withColumn("rank_youngest", row_number().over(window_youngest)) \
    .withColumn("rank_oldest", row_number().over(window_oldest)) \
    .filter((col("rank_youngest") == 1) | (col("rank_oldest") == 1)) \
    .withColumn("driver_status", 
                when(col("rank_youngest") == 1, "Youngest")
                .when(col("rank_oldest") == 1, "Oldest")
                .otherwise(None)) \
    .select("raceId", "year", "driver_name", "age", "driver_status") \
    .orderBy("year", "raceId", "driver_status")

display(youngest_oldest_drivers)


raceId,year,driver_name,age,driver_status
833,1950,Philippe Étancelin,53,Oldest
833,1950,Geoff Crossley,29,Youngest
834,1950,Philippe Étancelin,53,Oldest
834,1950,José Froilán González,27,Youngest
835,1950,Mauri Rose,44,Oldest
835,1950,Troy Ruttman,20,Youngest
836,1950,Philippe Étancelin,53,Oldest
836,1950,Harry Schell,28,Youngest
837,1950,Philippe Étancelin,53,Oldest
837,1950,Geoff Crossley,29,Youngest


In [0]:
# Question 5: Driver with Most Wins and Losses for Each Race
# Calculate wins and losses for each driver
driver_results = df_results \
    .withColumn("result", 
                when(col("position") == 1, "win")
                .otherwise("loss")) \
    .groupBy("driverId") \
    .pivot("result", ["win", "loss"]) \
    .count() \
    .na.fill(0) \
    .join(df_drivers, "driverId") \
    .withColumn("driver_name", concat_ws(" ", col("forename"), col("surname"))) \
    .select("driverId", "driver_name", "win", "loss")

# Find driver with most wins and losses
window_most_wins = Window.orderBy(desc("win"))
window_most_losses = Window.orderBy(desc("loss"))

drivers_most_wins_losses = driver_results \
    .withColumn("rank_wins", row_number().over(window_most_wins)) \
    .withColumn("rank_losses", row_number().over(window_most_losses)) \
    .filter((col("rank_wins") == 1) | (col("rank_losses") == 1)) \
    .withColumn("status", 
                when(col("rank_wins") == 1, "Most Wins")
                .when(col("rank_losses") == 1, "Most Losses")
                .otherwise(None)) \
    .select("driver_name", "win", "loss", "status") \
    .orderBy("status")

display(drivers_most_wins_losses)


driver_name,win,loss,status
Fernando Alonso,32,338,Most Losses
Lewis Hamilton,103,219,Most Wins


In [0]:
# Question 6: Does age correlates with race placings?
# Join datasets and calculate age
drivers_age = df_results.select("raceId", "driverId", "position") \
    .join(races_with_date, "raceId") \
    .join(drivers_with_dob, "driverId") \
    .withColumn("driver_name", concat_ws(" ", col("forename"), col("surname"))) \
    .withColumn("age", floor(datediff(col("race_date"), col("birth_date")) / 365.25)) \
    .withColumn("position", col("position").cast(IntegerType()))


# Calculate correlation between age and position
correlation = drivers_age.stat.corr("age", "position")
print(f"Correlation between driver age and race position: {correlation}")

Correlation between driver age and race position: -0.11215458464163552
