In [0]:
# Author: Jay Jun
# Date: March 22, 2025
# Project: Applied Data Homework #2
# Purpose: Answer question using F1 data on the AWS S3 utilizing Databricks using either Pandas, R , or PySpark 
# Inputs: raw/pit_stops.csv, raw/drivers.csv, raw/results.csv, raw/races.csv, raw/status.csv, raw/qualifying.csv
# Outputs: 

In [0]:
# Creating a new folder in my AWS bucket to put all my results in 
dbutils.fs.mkdirs("s3://jwj2123-gr5069/processed/Assignment #2/")


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, round, avg, concat_ws, rank, row_number, round, when, udf, year, datediff, count, lit, when, to_date, month, dayofmonth
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

spark = SparkSession.builder \
    .appName("F1 Data Analysis") \
    .getOrCreate()

# Load the pit stops data
pit_stops_df = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)

# Load the drivers data for reference
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)

# Load the results data for reference 
results_df = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)

# Load the races data for reference
races_df = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True)

# Load the status data for reference 
status_df = spark.read.csv("s3://columbia-gr5069-main/raw/status.csv", header=True, inferSchema=True)

# Load the qualifying data for reference 
qualifying_df = spark.read.csv("s3://columbia-gr5069-main/raw/qualifying.csv", header=True, inferSchema=True)


###Q1:[10 pts] What was the average time each driver spent at the pit stop for each race?


In [0]:
#Joining pit stops and drivers data (specfically selecting columns that are needed)

drivers_df = drivers_df.select("driverId", "forename", "surname")
pit_stops_with_names = pit_stops_df.join(drivers_df, "driverId")

# Select only the necessary columns from drivers
drivers_df = drivers_df.select("driverId", "forename", "surname")

# Calculate average pit stop duration for each driver in each race
avg_pit_stop_times = pit_stops_df.groupBy("raceId", "driverId") \
    .agg(round(avg("duration"), 3).alias("avg_duration"))

# Join with drivers_df to add driver names
result = avg_pit_stop_times.join(drivers_df, "driverId") \
    .withColumn("driver_name", concat_ws(" ", trim(col("forename")), trim(col("surname")))) \
    .select("raceId", "driver_name", "avg_duration") \
    .orderBy("raceId", "avg_duration")

# Show the results
result.show(100, False)

# Outputting to AWS
result.coalesce(1).write.mode('overwrite').option("header", True)\
    .csv("s3://jwj2123-gr5069/processed/Assignment #2/average_time/")


###Q2:[20 pts] Rank the average time spent at the pit stop in order of who won each race

In [0]:
# Calculate the average pit stop time per driver and race (rounded to 2 decimal places) with average pit time calcualted in milliseconds
avg_pit_stop_time_df = pit_stops_df.groupBy("raceId", "driverId") \
    .agg(round(avg("milliseconds"), 2).alias("avg_pit_stop_time"))

# Join with the results dataset to bring in the finishing order (using 'positionOrder')
# We use 'positionOrder' instead of 'position' because 'positionOrder' actually shows the finishing position instead of a null value. 
# It looks like 'positionOrder' actually puts a position in the race, even if they did not finish the race
avg_pit_stop_with_results = avg_pit_stop_time_df.join(
    results_df.select("raceId", "driverId", "positionOrder"),
    on=["raceId", "driverId"],
    how="left"
)

# Adding some driver names
avg_pit_stop_with_results = avg_pit_stop_with_results.join(
    drivers_df, on="driverId", how="left"
).withColumn("driver_name", concat_ws(" ", "forename", "surname"))

# Adding the race names
avg_pit_stop_with_results = avg_pit_stop_with_results.join(
    races_df.select("raceId", "name"),
    on="raceId",
    how="left"
).withColumnRenamed("name", "race_name")

# Define a window partitioned by raceId, ordering by positionOrder.
# Using asc_nulls_last() ensures that if any non-finishers (with null in positionOrder) exist, they are placed at the end. Putting this in just in case there are null values in thedata
window_spec = Window.partitionBy("raceId").orderBy(col("positionOrder").asc_nulls_last())

# Create a new column with the finishing rank based on the race results
ranked_result = avg_pit_stop_with_results.withColumn("finishing_rank", rank().over(window_spec))

# Order the final DataFrame by raceId, finishing_rank, and driverId for consistency
final_df = ranked_result.orderBy("raceId", "finishing_rank", "driverId")

# Displaying the results
final_df.select("raceId", "race_name", "driverId", "driver_name", 
                "avg_pit_stop_time", "positionOrder", "finishing_rank").show(20)

# Outputting final ranked results to AWS as CSV
final_df.coalesce(1).write.mode('overwrite').option("header", True)\
    .csv("s3://jwj2123-gr5069/processed/Assignment #2/rank_average_time/")


###3.[20 pts] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
#  To fill in the rest of the missing code, I coded for the first three letters of their surname and capitalized it. This was the pattern I was seeing with the non-null values.

# Define a user defined function (UDF) to generate the missing codes
def generate_code(surname):
    return surname[:3].upper()

generate_code_udf = udf(generate_code, StringType())

# Apply the (UDF) to fill in the missing codes
drivers_df_filled = drivers_df.withColumn(
    "code",
    when(col("code") == "\\N", generate_code_udf(col("surname"))).otherwise(col("code"))
)

# Show the results
drivers_df_filled.select("driverId", "forename", "surname", "code").show(10)

# Outputting filled-in driver codes to AWS
drivers_df_filled.coalesce(1).write.mode('overwrite').option("header", True)\
    .csv("s3://jwj2123-gr5069/processed/Assignment #2/missing_code/")

###4.[20 pts] Who is the youngest and oldest driver for each race? Create a new column called “Age”


In [0]:
#To find the age of each driver, I joined the 'pit_stops' dataset with the 'drivers' dataset and the 'races' dataset. I then calculated the age of each driver by subtracting the date of birth from the race date.

# Convert date columns to date type if needed
drivers_df = drivers_df.withColumn("dob", F.to_date("dob"))
races_df = races_df.withColumn("date", F.to_date("date"))

# Join the dataframes
joined_df = results_df.join(drivers_df, "driverId").join(races_df, "raceId")

# Calculate age: difference in years between race date and date of birth
# This subtracts birth year from race year, then adjusts if birthday hasn't occurred yet that year
joined_df = joined_df.withColumn(
    "Age",
    F.when(
        (F.month(joined_df.date) > F.month(joined_df.dob)) | 
        ((F.month(joined_df.date) == F.month(joined_df.dob)) & 
         (F.dayofmonth(joined_df.date) >= F.dayofmonth(joined_df.dob))),
        F.year(joined_df.date) - F.year(joined_df.dob)
    ).otherwise(
        F.year(joined_df.date) - F.year(joined_df.dob) - 1
    )
)

# Find youngest and oldest driver for each race
window_spec = Window.partitionBy("raceId")

result_df = joined_df.withColumn(
    "min_age", F.min("Age").over(window_spec)
).withColumn(
    "max_age", F.max("Age").over(window_spec)
).filter(
    (F.col("Age") == F.col("min_age")) | (F.col("Age") == F.col("max_age"))
).select(
    "raceId", 
    "name", 
    "date", 
    "driverId", 
    "forename", 
    "surname", 
    "Age",
    F.when(F.col("Age") == F.col("min_age"), "Youngest")
     .otherwise("Oldest").alias("Age_Category")
).orderBy("raceId", "Age_Category")

# Show results
result_df.show()

# Outputting youngest and oldest drivers per race to AWS as CSV
result_df.coalesce(1).write.mode('overwrite').option("header", True)\
    .csv("s3://jwj2123-gr5069/processed/Assignment #2/age/")


###5.[20 pts] For a given race, which driver has the most wins and losses?

In [0]:
# Identify all statusIds that represent a "Did Not Finish" (DNF)
# Usually any description that is NOT "Finished" is a DNF
dnf_status_ids = status_df.filter(col("status") != "Finished").select("statusId").rdd.flatMap(lambda x: x).collect()

# Filter to races before the given race. Here to get a given race change the 'target_race_id' to the race you want to see. Right now it is set to 841. 
target_race_id = 841
previous_races_df = results_df.filter(col("raceId") < target_race_id)

# Create labeled columns
labeled_df = previous_races_df.withColumn(
    "win", when(col("positionOrder") == 1, 1).otherwise(0)
).withColumn(
    "not_completed", when(col("statusId").isin(dnf_status_ids), 1).otherwise(0)
).withColumn(
    "completed_not_won", when(
        (col("positionOrder") > 1) & (~col("statusId").isin(dnf_status_ids)), 1
    ).otherwise(0)
).withColumn(
    "total_participated", lit(1)
)

# Aggregate
summary_df = labeled_df.groupBy("driverId").agg(
    count(when(col("win") == 1, True)).alias("wins"),
    count(when(col("completed_not_won") == 1, True)).alias("completed_not_won"),
    count(when(col("not_completed") == 1, True)).alias("not_completed"),
    count(col("total_participated")).alias("total_races")
)

# Join with driver names
final_df = summary_df.join(drivers_df.select("driverId", "surname"), on="driverId", how="left")

# Display results
final_df.select("surname", "wins", "completed_not_won", "not_completed", "total_races") \
    .orderBy("wins", ascending=False).show()

# Outputting to AWS
final_df.coalesce(1).write.mode('overwrite').option("header", True)\
    .csv("s3://jwj2123-gr5069/processed/Assignment #2/win_losses/")

###6.[10 pts] Continue exploring the data by answering your own question.


In [0]:
# A queston I will be answeing will be "Which driver improves the most positions, on average, from their qualifying position to thier race finishing position?"

# Join on raceId and driverId to get both qualifying and result info
joined_df = qualifying_df.join(
    results_df.select("raceId", "driverId", "positionOrder"),
    on=["raceId", "driverId"],
    how="inner"
)

# Calculate position change (positive = improved positions)
position_diff_df = joined_df.withColumn(
    "position_gain", col("position") - col("positionOrder")
)

# Group by driver and calculate average gain, rounding to the nearest whole number
avg_gain_df = position_diff_df.groupBy("driverId").agg(
    round(avg("position_gain")).alias("avg_position_gain")
)

# Join with driver names
final_gain_df = avg_gain_df.join(
    drivers_df.select("driverId", "surname"),
    on="driverId",
    how="left"
)

# Order by most average positions gained
display(
    final_gain_df.select("surname", "avg_position_gain")
    .orderBy("avg_position_gain", ascending=False)
)

# Outputting to AWS
final_gain_df.coalesce(1).write.mode('overwrite').option("header", True)\
    .csv("s3://jwj2123-gr5069/processed/Assignment #2/driver_improvement/")