In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header = True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header = True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header = True)
df_races= spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header = True)

### 1. What was the average time each driver spent at the pit stop for each race?

In [0]:
display(df_pit_stops.limit(10))

In [0]:
avg_pit_stop_times = df_pit_stops.groupBy("raceId", "driverId") \
                              .agg(
                                  F.avg("duration").alias("avg_duration"),
                                  F.avg("milliseconds").alias("avg_milliseconds"),
                                  F.count("stop").alias("num_pit_stops")
                              ) \
                              .orderBy(F.col("raceId").cast("int"), "avg_milliseconds")

display(avg_pit_stop_times.limit(10))

### 2. Rank the average time spent at the pit stop in order of who won each race

In [0]:
display(df_results.limit(10))

In [0]:
avg_pit_stop_times = df_pit_stops.groupBy("raceId", "driverId") \
                          .agg(
                              F.avg("duration").alias("avg_duration"),
                              F.avg("milliseconds").alias("avg_milliseconds"),
                              F.count("stop").alias("num_pit_stops")
                          )

pit_stops_with_results = avg_pit_stop_times.join(
    df_results.select("raceId", "driverId", "position", "positionText", "positionOrder"),
    on=["raceId", "driverId"],
    how="inner"
)

pit_stops_with_results = pit_stops_with_results.withColumn(
    "finished_race",
    F.when((F.col("positionText") != "R") & (F.col("position").isNotNull()), True).otherwise(False)
)

ranked_pit_stops = pit_stops_with_results.orderBy(
    F.col("raceId").cast("int"),
    F.col("positionOrder").cast("int")
)

display(ranked_pit_stops.limit(10))

**My approach to handling drivers who did not finish the race:**
1. I created a boolean column finished_race that identifies drivers who finished (position is not null and positionText is not "R") versus those who didn't finish.
2. I included both finishers and non-finishers in the main analysis, keeping them ordered by their positionOrder value, which ensures that even non-finishers are ranked correctly based on how far they progressed in the race before retiring.

### 3. Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
display(df_drivers.limit(10))

In [0]:
df_drivers_clean = df_drivers.na.replace('\\N', None)

generate_code_udf = F.udf(lambda surname: surname[:3].upper() if surname else None)

df_drivers_with_codes = df_drivers_clean.withColumn(
    "code",
    F.when(F.col("code").isNull(), generate_code_udf(F.col("surname"))).otherwise(F.col("code"))
)

display(df_drivers_with_codes.limit(20))

**Method for generating missing driver codes:**
1. First, I ensure that the '\\N' character is correctly recognized as NULL value
2. Then, for drivers missing codes, I directly extract the first three letters from their surname and convert to uppercase as the code

### 4. Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
display(df_races.limit(10))

In [0]:
race_results = df_results.join(df_races, on="raceId")

driver_race_data = race_results.join(df_drivers, on="driverId")

driver_race_data = driver_race_data.withColumn(
    "Age", 
    F.floor(F.months_between(F.to_date("date"), F.to_date("dob")) / 12)
)

window_youngest = Window.partitionBy("raceId").orderBy("Age")
window_oldest = Window.partitionBy("raceId").orderBy(F.desc("Age"))

youngest_drivers = driver_race_data.withColumn(
    "row_number", F.row_number().over(window_youngest)
).filter(F.col("row_number") == 1).select(
    "raceId", 
    "year", 
    "name",
    F.col("driverId").alias("youngest_driverId"),
    F.concat(F.col("forename"), F.lit(" "), F.col("surname")).alias("youngest_driver"),
    F.col("Age").alias("youngest_age")
)

oldest_drivers = driver_race_data.withColumn(
    "row_number", F.row_number().over(window_oldest)
).filter(F.col("row_number") == 1).select(
    "raceId", 
    F.col("driverId").alias("oldest_driverId"),
    F.concat(F.col("forename"), F.lit(" "), F.col("surname")).alias("oldest_driver"),
    F.col("Age").alias("oldest_age")
)

age_analysis = youngest_drivers.join(
    oldest_drivers, 
    on="raceId"
).select(
    "raceId", 
    "year", 
    "name", 
    "youngest_driverId",
    "youngest_driver", 
    "youngest_age", 
    "oldest_driverId",
    "oldest_driver", 
    "oldest_age"
)

display(age_analysis.orderBy(F.col("raceId").cast("int")).limit(20))

**How the Age calculation works:**
1. We calculate the age of each driver at the time of each race by:    
- Taking the difference between the race date and driver birth date in     
- Dividing by 12 to convert     
- Using floor() to get the integer value (representing completed years of life)
2. We then find the minimum and maximum ages for each race to identify the youngest and oldest drivers.

### 5. For a given race, which driver has the most wins and losses?

In [0]:
df_race_results = df_results.withColumn(
    "is_win", F.when(F.col("position") == 1, 1).otherwise(0)
).withColumn(
    "is_loss", F.when((F.col("position").isNotNull()) & (F.col("position") != 1), 1).otherwise(0)
)

df_races_dated = df_races.select("raceId", "name", "date").orderBy("date")
df_results_dated = df_race_results.join(df_races_dated, on="raceId")

df_prior_races = df_results_dated.alias("current").join(
    df_results_dated.alias("prior"),
    (F.col("current.driverId") == F.col("prior.driverId")) & 
    (F.col("prior.date") < F.col("current.date")),
    "left"
)

df_driver_stats = df_prior_races.groupBy(
    "current.raceId", "current.driverId"
).agg(
    F.sum("prior.is_win").alias("prev_wins"),
    F.sum("prior.is_loss").alias("prev_losses")
)

window_wins = Window.partitionBy("raceId").orderBy(F.desc("prev_wins"))
window_losses = Window.partitionBy("raceId").orderBy(F.desc("prev_losses"))

df_ranked = df_driver_stats.withColumn(
    "win_rank", F.row_number().over(window_wins)
).withColumn(
    "loss_rank", F.row_number().over(window_losses)
)

df_most_wins = df_ranked.filter(F.col("win_rank") == 1).select(
    "raceId", 
    F.col("driverId").alias("wins_driver_id"),
    F.col("prev_wins").alias("wins_count")
)

df_most_losses = df_ranked.filter(F.col("loss_rank") == 1).select(
    "raceId", 
    F.col("driverId").alias("losses_driver_id"),
    F.col("prev_losses").alias("losses_count")
)

df_drivers_wins = df_drivers.alias("drivers_wins")
df_drivers_losses = df_drivers.alias("drivers_losses")

final_results = df_most_wins.join(
    df_most_losses, 
    on="raceId", 
    how="full_outer"
).join(
    df_drivers_wins.select(
        F.col("driverId"),
        F.concat(F.col("forename"), F.lit(" "), F.col("surname")).alias("wins_driver_name")
    ),
    F.col("wins_driver_id") == df_drivers_wins.driverId,
    "left"
).join(
    df_drivers_losses.select(
        F.col("driverId"),
        F.concat(F.col("forename"), F.lit(" "), F.col("surname")).alias("losses_driver_name")
    ),
    F.col("losses_driver_id") == df_drivers_losses.driverId,
    "left"
).join(
    df_races_dated.select("raceId", "name"),
    on="raceId",
    how="left"
).select(
    "raceId",
    "name",
    "wins_driver_name",
    "wins_count",
    "losses_driver_name",
    "losses_count"
)

display(final_results.orderBy(F.col("raceId").cast("int")).limit(20))

### 6. My Question: Which country's drivers have won the most races?

In [0]:
winning_results = df_results.filter(F.col("position") == 1)

winning_drivers = winning_results.join(df_drivers, on="driverId")

wins_by_nationality = winning_drivers.groupBy("nationality").agg(
    F.count("resultId").alias("total_wins")
)

display(wins_by_nationality.orderBy(F.col("total_wins").desc()))

This analysis reveals that **British** drivers have dominated Formula 1 with 309 race wins significantly ahead of **German** (179) and **Brazilian** (101) drivers. **_European nations_** comprise most of the top positions, though several non-European countries like **Brazil**, **Australia**, and **Argentina** have also made significant contributions to F1 racing history.