In [0]:
from pyspark.sql.functions import datediff, current_date, avg
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, max, min, when, substring, upper, floor, months_between, to_date, first, concat_ws
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
display(df_results)

In [0]:
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)
display(df_pitstops)

#1. What was the average time each driver spent at the pit stop for each race?

In [0]:
df_avg_duration = df_pitstops.groupby('raceId', 'driverId').agg(avg('milliseconds')).orderBy("raceId", "avg(milliseconds)")
display(df_avg_duration)

In [0]:
#raceId starts from 1000, change the order to make the raceId starts from 841, and run again
df_avg_duration = df_avg_duration.orderBy(col("raceId").cast("int").asc(), "avg(milliseconds)")
display(df_avg_duration)

# 2.  Rank the average time spent at the pit stop in order of who won each race

In [0]:
#join average pitstops with results 
df_rank_avgpit = df_avg_duration.join(df_results, on=["raceId", "driverId"], how="left")

# order by raceId and position order
df_rank_avgpit = df_rank_avgpit.orderBy(col("raceId").cast("int").asc(), col("positionOrder").cast("int").asc())

#make the df easier to read
df_rank_avgpit = df_rank_avgpit.select("raceId", "driverId", "positionOrder", "avg(milliseconds)")


display(df_rank_avgpit)

I only include drivers with a final position to see who won the game. For drivers who did not finish the race, I still put them in the df. 

# 3.  Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
df_driver = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
display(df_driver)

In [0]:
df_driver = df_driver.withColumn('code', upper(substring(col("surname"), 1, 3)))
display(df_driver)


I sort out the first three characters of drivers' surname and capitalize them to make the code.

# 4. Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_races = df_races.select("raceId", "year", "name", "date")
display(df_races)

In [0]:
# join driver df with races
df_driver_age = df_results.select("raceId", "driverId").join(df_driver, on="driverId", how="left").join(df_races, on="raceId", how="left")
display(df_driver_age)

In [0]:
#combine forename and surname into one column
df_driver_age = df_driver_age.withColumn(
    "driver_name", 
    concat_ws(" ", col("forename"), col("surname"))
)


In [0]:
df_driver_age = df_driver_age.withColumn("race_date", to_date("date"))
df_driver_age = df_driver_age.withColumn("dob", to_date("dob"))
df_driver_age = df_driver_age.withColumn("Age", floor(months_between(col("race_date"), col("dob")) / 12))
display(df_driver_age)

Create a new column called “Age” that shows the age of drivers when they are participating in certain race. I use months_between to calculate the total months that driver lived during certian race, and / 12 to get the year, then use floor to get the integer age. 

In [0]:
age_stats = df_driver_age.groupBy("raceId").agg(
    min("Age").alias("youngest_age"),
    max("Age").alias("oldest_age")
)
display(age_stats)


In [0]:
window_spec = Window.partitionBy("raceId").orderBy("Age")

# Rank the youngest driver 
youngest_driver_df = df_driver_age.withColumn("rank", F.row_number().over(window_spec)) \
                                   .filter(col("rank") == 1) \
                                   .select("raceId", "driver_name", "Age") \
                                   .withColumnRenamed("driver_name", "youngest_driver") \
                                   .withColumnRenamed("Age", "youngest_age")

# Rank the oldest driver 
oldest_driver_df = df_driver_age.withColumn("rank", F.row_number().over(window_spec.orderBy(F.col("Age").desc()))) \
                                .filter(col("rank") == 1) \
                                .select("raceId", "driver_name", "Age") \
                                .withColumnRenamed("driver_name", "oldest_driver") \
                                .withColumnRenamed("Age", "oldest_age")

# Join the youngest and oldest driver df 
result_df = youngest_driver_df.join(
    oldest_driver_df, 
    on="raceId", 
    how="inner"
).select(
    col("raceId").alias("race_id"), 
    col("youngest_driver"),
    col("youngest_age"),
    col("oldest_driver"),
    col("oldest_age")
)

display(result_df)

# 5. For a given race, which driver has the most wins and losses?

In [0]:

df_results = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True)
display(df_results)
df_status = spark.read.csv('s3://columbia-gr5069-main/raw/status.csv', header=True)
display(df_status)


In [0]:
df_results_new = df_results.join(df_status, on="statusId", how="left")
display(df_results_new)
     

In [0]:
df_results_status = df_results_new \
    .withColumn("win", when(col("positionOrder") == 1, 1).otherwise(0)) \
    .withColumn("completed", when(
        (col("status").contains("Finished")) | (col("status").rlike("\+[0-9]+ Laps?")),
        1
    ).otherwise(0)) \
    .withColumn("completed_not_win", when(
        (col("completed") == 1) & (col("positionOrder") != 1), 1
    ).otherwise(0))
display(df_results_status)




In [0]:
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True) 
display(df_races)

In [0]:
df_driver = df_driver.withColumn(
    "driver_name", 
    concat_ws(" ", col("forename"), col("surname"))
)
display(df_driver)

In [0]:
df_results_date = df_results_status.join(df_races.select("raceId", "date"), on="raceId", how="left")
df_results_date = df_results_date.join(df_driver.select("driverId", "driver_name"), on="driverId", how="left")
display(df_results_date)

In [0]:
window_spec = Window.partitionBy("driverId").orderBy("date").rowsBetween(Window.unboundedPreceding, -1)

df_driver_winlose = df_results_date \
    .withColumn("all_wins_before", F.sum("win").over(window_spec)) \
    .withColumn("all_completes_not_win_before", F.sum("completed_not_win").over(window_spec))

df_driver_winlose = df_driver_winlose.select(
    "raceId", "driverId", "driver_name", "date", "positionOrder",
    "win", "completed_not_win",
    "all_wins_before", "all_completes_not_win_before"
).orderBy("raceId", "positionOrder")

display(df_driver_winlose)

# 6. own question
What position did each driver hold on their last completed lap in each race?

In [0]:
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True)
display(df_laptimes)

In [0]:
df_laptimes = df_laptimes.withColumn("lap", col("lap").cast("integer"))
max_laps = df_laptimes.groupBy("raceId", "driverId").agg(max("lap").alias("last_lap"))
display(max_laps)

In [0]:
final_positions_df = max_laps.alias("max").join(
    df_laptimes.alias("laptimes"),
    (col("max.raceId") == col("laptimes.raceId")) & 
    (col("max.driverId") == col("laptimes.driverId")) & 
    (col("max.last_lap") == col("laptimes.lap")),
    "inner"
)

# Select the relevant columns with clear aliases to avoid ambiguity
final_positions_df = final_positions_df.select(
    col("max.raceId"),
    col("laptimes.driverId"),
    col("max.last_lap"),
    col("laptimes.position"),
    col("laptimes.time"),
    col("laptimes.milliseconds")
)

final_positions_df = final_positions_df.orderBy("raceId", col("last_lap").cast("int").asc())

display(final_positions_df)
