In [0]:
from pyspark.sql.functions import when, col, round, avg, sum, count, desc, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import floor, months_between, to_date, min, max, first


In [0]:
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True)
display(df_laptimes)

In [0]:
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
display(df_drivers)
df_drivers.count()

In [0]:
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)
display(df_pitstops)

###1. Average time at the pits 
provide the average time that each driver spent at the pit stop for each race

In [0]:
#avg pit stop time
df_avg_pit_time = (
    df_pitstops
    .groupBy("raceId", "driverId")
    .agg(avg("duration").alias("avg_pit_time"))
    .orderBy("raceId", "avg_pit_time")  
)
df_avg_pit_time.show()



weird... why raceID start with 1000 instead of 841?

In [0]:
# Checking data type of pitstops
df_pitstops.printSchema()

In [0]:
# Change data type of raceId from String to Int.
df_pitstops = df_pitstops.withColumn("raceId", col("raceId").cast("int"))

# run again
# avg pit stop time, round to 3th decimal place
df_avg_pit_time = (
    df_pitstops
    .groupBy("raceId", "driverId")
    .agg(round(avg("duration"), 3).alias("avg_pit_time"))
    .orderBy("raceId", "avg_pit_time")  
)
df_avg_pit_time.show()

###2. Rank average pit stop
 rank the average time spent at the pit stop (from previous question) and use the finishing order on the race to rank order these average times. Add in your comments how you decided to deal with drivers who did not finish the race

In [0]:
# finishing order
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)


In [0]:
# filter out the raceId, driverId, positionOrder, statusId columns
df_results_filtered = df_results.select("raceId", "driverId", "positionOrder")

#join with avg pit stop time
df_combined = df_avg_pit_time.join(df_results_filtered, on=["raceId", "driverId"], how="left")
df_combined.show()

In [0]:
# place order in each race
df_combined = df_combined.orderBy("raceId", "positionOrder")
df_combined.show()

In [0]:
# change data type of posistionOrder from String to Int
df_combined = df_combined.withColumn("positionOrder", col("positionOrder").cast("int"))

# run again
df_combined = df_combined.orderBy("raceId", "positionOrder")
df_combined.show()

I ranked the average pit stop time for each race by ordering the drivers based on their finishing places. This allows us to observe pit stop efficiency in the context of the race outcome.
For drivers who did not finish the race (DNF), I kept them in the dataset but their finishing position might be less meaningful.

### 3. insert the missing code
insert a three letter code where codes are missing in the driver dataset. Write in comments how you arrived at these codes

In [0]:
# check the column of driver dataset
df_drivers.printSchema()
df_drivers.show()



I checked for the schema that the column "code" is a string and it is nullable, so I will check any missing 3-letter driver codes in the dataset using a null filter on the column. 

In [0]:
# filter missing code
df_missing_code = df_drivers.filter(col("code").isNull())
df_missing_code.select("driverId", "forename", "surname").show()
df_drivers.filter(col("code").isNull()).count()

The result returned no missing values, which means all drivers already have valid codes. Therefore... no manual insert is needed at this stage

### 4. youngest and oldest driver
- create a new column called “Age” that counts how many birthdays each driver has had in their lives
- explain how you reached this number in your comments
- identify the oldest and youngest driver for each race

In [0]:
# get the race data
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_races = df_races.select("raceId", "date")

# get driver date of birth
df_driver_dob = df_drivers.select("driverId", "dob")

# join race date, dob 
df_driver_age = (
    df_results
    .select("raceId", "driverId")
    .join(df_driver_dob, on="driverId", how="left")
    .join(df_races, on="raceId", how="left")
)

In [0]:
# ensure formate to date
df_driver_age = df_driver_age.withColumn("race_date", to_date("date"))
df_driver_age = df_driver_age.withColumn("dob", to_date("dob"))

# calculate age: total months(race date - dob) / 12 = age
df_driver_age = df_driver_age.withColumn(
    "Age",
    floor(months_between(col("race_date"), col("dob")) / 12)
)

# find the youngest driver for each race
window_young = Window.partitionBy("raceId").orderBy("Age")
df_youngest = df_driver_age.withColumn("rank_young", rank().over(window_young)) \
                           .filter(col("rank_young") == 1) \
                           .select("raceId", col("driverId").alias("youngest_driverId"), col("Age").alias("youngest_age"))

# find the oldest driver for each race
window_old = Window.partitionBy("raceId").orderBy(col("Age").desc())
df_oldest = df_driver_age.withColumn("rank_old", rank().over(window_old)) \
                         .filter(col("rank_old") == 1) \
                         .select("raceId", col("driverId").alias("oldest_driverId"), col("Age").alias("oldest_age"))


df_extremes = df_youngest.join(df_oldest, on="raceId", how="inner").orderBy("raceId")
#change data type of raceId from String to Int
df_extremes = df_extremes.withColumn("raceId", col("raceId").cast("int")).orderBy("raceId")
df_extremes.show()

I computed each driver's age at the time of the race by using their date of birth and the race date.
I used the formula `months_between(race_date, dob) / 12` and applied floor() to count only full birthdays (integer age).
Then I grouped the data by raceId and used min() and max() to find the youngest and oldest drivers per race.
In some races, multiple drivers shared the exact same age, resulting in ties for youngest or oldest. In such cases, I retained all tied drivers to fully reflect the data.

### 5.  most wins and loses per race
- on a given race, provide a count of how many times a diver has won previous races, and a count of all the times a driver has not won (but completed) a race

In [0]:
#Check statusID meaning
df_results_raw = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True)
df_status = spark.read.csv('s3://columbia-gr5069-main/raw/status.csv', header=True)
df_status.show()
df_status = df_status.withColumnRenamed("status", "statusText")

df_results_1 = df_results_raw.join(df_status, on="statusId", how="left")
df_results_1.select("raceId", "driverId", "statusId", "statusText").show()

In [0]:
# add is_win, is_completed, is_completed_not_win columns
df_results_2 = df_results_1 \
    .withColumn("is_win", when(col("positionOrder") == 1, 1).otherwise(0)) \
    .withColumn("is_completed", when(
        (col("statusText").contains("Finished")) | (col("statusText").rlike("\\+[0-9]+ Laps?")),
        1
    ).otherwise(0)) \
    .withColumn("is_completed_not_win", when(
        (col("is_completed") == 1) & (col("positionOrder") != 1), 1
    ).otherwise(0))


df_results_2.select("raceId", "driverId", "positionOrder", "statusText", "is_win", "is_completed", "is_completed_not_win").show()

now we transfer the race result status to binary data and we can calculate how many times a diver has won previous races.

In [0]:
#import race data
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True, inferSchema=True) \
    .select("raceId", "date") \
    .withColumn("race_date", to_date(col("date")))


# join race date with results
df_results_with_date = df_results_2.join(df_races.select("raceId", "race_date"), on="raceId", how="left")

#partition by driver ID
window_spec = Window.partitionBy("driverId").orderBy("race_date").rowsBetween(Window.unboundedPreceding, -1)

# add stats data
df_driver_history = df_results_with_date \
    .withColumn("total_wins_before", sum("is_win").over(window_spec)) \
    .withColumn("total_completes_not_win_before", sum("is_completed_not_win").over(window_spec))


df_driver_history.select(
    "raceId", "driverId", "race_date", "positionOrder",
    "is_win", "is_completed_not_win",
    "total_wins_before", "total_completes_not_win_before"
).orderBy("raceId", "driverId").show()

For each driver and each race, I calculated the cumulative number of wins and completed but not won races prior to the current race.
I used a window function over the driver’s race history ordered by race date, and aggregated:
`is_win to get total_wins_before
is_completed_not_win to get total_completes_not_win_before`

### 6. own question - TOP Crusher 💸
Find the most costly drivers per season — the ones who crashed the most and caused the highest repair costs for their teams. 💸💸💸

In [0]:
# Join status, driver, race, results data
df_status = spark.read.csv("s3://columbia-gr5069-main/raw/status.csv", header=True).withColumnRenamed("status", "statusText")
df_drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True) \
    .select("driverId", "forename", "surname")
df_races = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True).select("raceId", "year")
df_results = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)

# Step 2: Join status + race year
df_incidents = df_results.join(df_status, on="statusId", how="left") \
                         .join(df_races, on="raceId", how="left")

# Step 3: Define which statusText are considered crash-related incidents that likely incurred repair costs
crash_keywords = ["Collision", "Accident", "Spun off", "Damage"]

df_incidents = df_incidents.withColumn(
    "is_crash",
    when(col("statusText").isin(crash_keywords), 1).otherwise(0)
)

# Step 4: each season x driver crushed time
df_crash_counts = df_incidents.groupBy("year", "driverId") \
    .agg(count(when(col("is_crash") == 1, True)).alias("crash_count"))

# Step 5: find out the most crashed driver in each season
window_spec = Window.partitionBy("year").orderBy(desc("crash_count"))

df_top_crashers = df_crash_counts.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .orderBy("year")

# join driver name
df_top_crashers_named = df_top_crashers.join(df_drivers, on="driverId", how="left")

#Show results
df_top_crashers_named.select("year", "driverId", "forename", "surname", "crash_count").orderBy("year").show()               