In [0]:
from pyspark.sql.functions import avg
from pyspark.sql.functions import rank
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, upper, substring, current_date, datediff, floor, year, lit
from pyspark.sql.functions import min as min_, max as max_, sum as sum_


In [0]:
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header = True)

In [0]:
display(df_pitstops)

In [0]:
# Calculate average pit stop time per driver per race
avg_pit_time = df_pitstops.groupBy("raceId", "driverId") \
                           .agg(avg("milliseconds").alias("avg_pit_stop_time")) \
                           .orderBy("raceId", "driverId")

display(avg_pit_time)

In [0]:
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header = True)

df_pitstops = df_pitstops.withColumn("milliseconds", df_pitstops["milliseconds"].cast("int"))
df_results = df_results.withColumn("positionOrder", df_results["positionOrder"].cast("int"))

# Calculate average pit stop time per driver per race
avg_pit_stop_time_df = df_pitstops.groupBy("raceId", "driverId") \
                                   .agg(avg("milliseconds").alias("avg_pit_stop_time_ms"))

# Join average pit stop time with race results to get finishing order
joined_df = avg_pit_stop_time_df.join(df_results.select("raceId", "driverId", "positionOrder"), 
                                     ["raceId", "driverId"])

#  Rank drivers based on finishing order
window_spec = Window.partitionBy("raceId").orderBy("positionOrder")

ranked_df = joined_df.withColumn("finishing_rank", rank().over(window_spec))

#  Order results by race and rank
final_df = ranked_df.orderBy("raceId", "finishing_rank")

display(final_df.select("raceId", "driverId", "positionOrder", "avg_pit_stop_time_ms", "finishing_rank"))

I chose to include all drivers initially but kept DNF drivers' positionOrder values in the dataset. For ranking, they naturally receive a lower finishing rank (since valid positionOrder values are ordered first). However, for certain analyses, excluding DNFs may make the insights cleaner, especially if you're interested in pit stop efficiency influencing race completion.

In [0]:
# Load drivers dataset
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header = True)

# Check current codes and where codes are missing
df_drivers.select("driverId", "surname", "code").filter(col("code").isNull()).show()

# Insert missing codes:
# Approach: Use the first three letters of the driver's surname (uppercase) as the default code
drivers_df_filled = df_drivers.withColumn(
    "code",
    when(col("code").isNull(), upper(substring(col("surname"), 1, 3)))  # First 3 letters of surname, uppercase
    .otherwise(col("code"))
)

# View updated dataframe
display(df_drivers.select("driverId", "surname", "code"))


In [0]:
# Convert dob column to DateType
df_drivers = df_drivers.withColumn("dob", col("dob").cast("date"))

# Create 'Age' column: Age = difference between current date and dob divided by 365
# floor ensures we count full birthdays only (i.e., complete years)
df_drivers = df_drivers.withColumn("Age", floor(datediff(current_date(), col("dob")) / 365))

# Show sample of Age column
df_drivers.select("driverId", "dob", "Age").show()

df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header = True)
# Join results with drivers to include Age
race_driver_df = df_results.join(df_drivers.select("driverId", "Age"), on="driverId")

# Group by raceId and find min and max Age
youngest_df = race_driver_df.groupBy("raceId").agg(min_("Age").alias("youngest_age"))
oldest_df = race_driver_df.groupBy("raceId").agg(max_("Age").alias("oldest_age"))

# Show youngest and oldest drivers per race
display(youngest_df)
display(oldest_df)

In [0]:
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header = True)
df_results = df_results.withColumn("raceId", col("raceId").cast("int")) \
                       .withColumn("driverId", col("driverId").cast("int")) \
                       .withColumn("positionOrder", col("positionOrder").cast("int"))

df_results = df_results.withColumn("win_flag", when(col("positionOrder") == 1, 1).otherwise(0)) \
                       .withColumn("loss_flag", when((col("positionOrder") > 1) & (col("statusId").contains("Finished")), 1).otherwise(0))
# Window specification: partition by driver, ordered by race
window_spec = Window.partitionBy("driverId").orderBy("raceId").rowsBetween(Window.unboundedPreceding, -1)

# Cumulative wins and losses before each race
df_results = df_results.withColumn("cumulative_wins", sum_("win_flag").over(window_spec)) \
                       .withColumn("cumulative_losses", sum_("loss_flag").over(window_spec))

display(df_results.select("raceId", "driverId", "cumulative_wins", "cumulative_losses").orderBy("raceId", "driverId"))





In [0]:
# Question: Which driver has the fastest lap on average?
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header = True)


fastest_lap_df = df_laptimes.groupBy("driverId") \
                             .agg(avg("milliseconds").alias("avg_lap_time")) \
                             .orderBy("avg_lap_time")

display(fastest_lap_df)