In [0]:
from pyspark.sql.functions import avg
from pyspark.sql.functions import avg, rank
from pyspark.sql.window import Window
from pyspark.sql.functions import expr, upper, substring, when, col, count
from pyspark.sql.functions import col, year, datediff, current_date
from pyspark.sql.functions import min, max
from pyspark.sql import functions as F

# Question 1: What was the average time each driver spent at the pit stop for each race?

In [0]:
df = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
df.show()

In [0]:
avg_pitstop_time = df.groupBy("raceId", "driverId").agg(avg("duration").alias("avg_pitstop_time"))
avg_pitstop_time.show(50)

# Question 2: Rank the average time spent at the pit stop in order of who won each race

In [0]:
df1 = spark.read.option("header", True)\
                .option("inferSchema", True)\
                .option("multiLine", True)\
                .csv("s3://columbia-gr5069-main/raw/results.csv")

df1 = df1.select("raceId", "driverId", "positionOrder")
df1.show(30)

In [0]:
winners_df = df1.filter(df1.positionOrder == 1).select("raceId", "driverId","positionOrder")
winners_pitstop = winners_df.join(avg_pitstop_time, on=["raceId", "driverId"], how="inner")
window_spec = Window.orderBy("avg_pitstop_time")
ranked_winners = winners_pitstop.withColumn("rank", rank().over(window_spec))
ranked_winners.show()

# Question 3: Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
df2 = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)
df2.show(50)

In [0]:
df2 = df2.withColumn("code", 
                     when((col("code").isNull()) | (col("code") == "\\N"),
                          upper(substring(col("surname"), 1, 3)))
                     .otherwise(col("code")))
df2.show(50)

# Question 4: Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
df2=df2.withColumn("dob", col("dob").cast("date"))
df2 = df2.withColumn("Age", year(current_date()) - year(col("dob")))
df2.show(50)

In [0]:
df_with_race = df1.join(df2, on="driverId", how="left")
window_spec = Window.partitionBy("raceId")
df_age = df_with_race.withColumn("youngest_age", min(col("Age")).over(window_spec)) \
                      .withColumn("oldest_age", max(col("Age")).over(window_spec))
df_new = df_age.filter((F.col("Age") == F.col("youngest_age")) | (F.col("Age") == F.col("oldest_age")))

df_new.select("raceId", "driverId",  "Age").show(10)

# Question 5: For a given race, which driver has the most wins and losses?

In [0]:
# Question 5
df_results_with_drivers = df1.join(df2, on="driverId", how="left")
most_wins = df_results_with_drivers.filter(F.col("positionOrder") == 1) \
    .select("raceId", "driverId", "code", F.lit("Winner").alias("status"))
window_spec1 = Window.partitionBy("raceId")
most_losses = df_results_with_drivers.withColumn("max_position", F.max("positionOrder").over(window_spec1)) \
    .filter(F.col("positionOrder") == F.col("max_position")) \
    .select("raceId", "driverId", "code", F.lit("Loser").alias("status"))
most_wins.show()
most_losses.show()

# Question 6: Continue exploring the data by answering your own question. (Which Driver is the person who got the most wins for all races?)

In [0]:
driver_wins = winners_df.groupBy("driverId") \
    .agg(count("*").alias("win_count")) 
most_wins = driver_wins.join(df2.select("driverId", "code", "forename", "surname"), on="driverId", how="left")
most_wins.orderBy(col("win_count").desc()).show()