In [0]:
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
df_driver = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)

In [0]:
df_races.show(5)
df_results.show(5)
df_driver.show(5)
df_pit_stops.show(5)

1. [`10 pts`] What was the average time each driver spent at the pit stop for each race?

In [0]:
from pyspark.sql.functions import col, avg

avg_pit_time = df_pit_stops.groupBy("raceId", "driverId") \
    .agg(avg("milliseconds").alias("avg_pit_time_ms"))

avg_pit_time.show()

2. [`20 pts`] Rank the average time spent at the pit stop in order of who won each race

In [0]:
from pyspark.sql.functions import asc

# 找出每场比赛的获胜者
winners = df_results.filter(col("positionOrder") == "1") \
    .select("raceId", "driverId").withColumnRenamed("driverId", "winnerId")

# 将第1题结果与获胜者 join
avg_pit_with_winner = avg_pit_time.join(winners, on="raceId")

# 按照获胜者 ID 排序
avg_pit_with_winner.orderBy("winnerId").show()

3. [`20 pts`] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

In [0]:
# 有的 driver 可能缺 code，补上
df_driver_code = df_driver.select("driverId", "code")

# 加入到结果中
avg_pit_with_code = avg_pit_time.join(df_driver_code, on="driverId", how="left")
avg_pit_with_code.show()

4. [`20 pts`] Who is the youngest and oldest driver for each race? Create a new column called “Age”

In [0]:
from pyspark.sql.functions import datediff, col, min

# 选择所需的列
driver_bob = df_driver.select("driverId", "dob")
races_date = df_races.select("raceId", "date")

# 计算每个车手在每场比赛中的年龄
results_with_age = (
    df_results.select("raceId", "driverId")
    .join(races_date, on="raceId")
    .join(driver_bob, on="driverId")
    .withColumn("age", datediff(col("date"), col("dob")))
)

# 找出每场比赛年龄最小和最大的车手
youngest = results_with_age.groupBy("raceId").agg(min("age").alias("min_age"))
oldest = results_with_age.groupBy("raceId").agg(max("age").alias("max_age"))

# 将结果与原始数据集进行连接，以找到对应的车手
youngest_drivers = results_with_age.alias("a").join(
    youngest.alias("b"), 
    (col("a.raceId") == col("b.raceId")) & 
    (col("a.age") == col("b.min_age"))
).select(
    col("a.raceId"), 
    col("a.driverId"), 
    col("a.age")
)

oldest_drivers = results_with_age.alias("a").join(
    oldest.alias("b"), 
    on=[col("a.raceId") == col("b.raceId"), col("a.age") == col("b.max_age")]
).select(
    col("a.raceId"), 
    col("a.driverId"), 
    col("a.age")
)

# 显示结果
youngest_drivers.show(5)
oldest_drivers.show(5)

5. [`20 pts`] For a given race, which driver has the most wins and losses?

In [0]:
from pyspark.sql.functions import col, count, max

# 计算每个车手在每场比赛中的胜利次数和失败次数
results_with_wins_losses = df_results.withColumn(
    "win", col("positionOrder") == 1
).withColumn(
    "loss", col("positionOrder") != 1
).groupBy("raceId", "driverId").agg(
    count("win").alias("wins"),
    count("loss").alias("losses")
)

# 找出每场比赛胜利次数最多的车手
most_wins = results_with_wins_losses.groupBy("raceId").agg(
    max("wins").alias("max_wins")
)

# 找出每场比赛失败次数最多的车手
most_losses = results_with_wins_losses.groupBy("raceId").agg(
    max("losses").alias("max_losses")
)

# 将结果与原始数据集进行连接，以找到对应的车手
most_wins_drivers = results_with_wins_losses.alias("a").join(
    most_wins.alias("b"), 
    on=[col("a.raceId") == col("b.raceId"), col("a.wins") == col("b.max_wins")]
).select(
    col("a.raceId"), 
    col("a.driverId"), 
    col("a.wins")
).dropDuplicates(["raceId", "driverId"])

most_losses_drivers = results_with_wins_losses.alias("a").join(
    most_losses.alias("b"), 
    on=[col("a.raceId") == col("b.raceId"), col("a.losses") == col("b.max_losses")]
).select(
    col("a.raceId"), 
    col("a.driverId"), 
    col("a.losses")
).dropDuplicates(["raceId", "driverId"])

# 显示结果
most_wins_drivers.show(5)
most_losses_drivers.show(5)

6. Select the winning driver (the champion) of each race from the race results.

In [0]:
winners = df_results.filter(col("positionOrder") == "1") \
    .select("raceId", "driverId").withColumnRenamed("driverId", "winnerId")

winners.show(5)