In [0]:
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import avg, countDistinct, to_date, datediff, col, max, row_number, when
from pyspark.sql.window import Window

In [0]:
base_path = "s3://columbia-gr5069-main/raw/"
df_pit = spark.read.option("header", True).csv(base_path + "pit_stops.csv")
df_results = spark.read.option("header", True).csv(base_path + "results.csv")
df_drivers = spark.read.option("header", True).csv(base_path + "drivers.csv")
df_races = spark.read.option("header", True).csv(base_path + "races.csv")
df_lap_times = spark.read.option("header", True).csv(base_path + "lap_times.csv")

In [0]:
# Q1: Average pit stop time per driver per race
avg_pit = df_pit.groupBy("raceId", "driverId") \
                .agg(avg("milliseconds").alias("avg_pit_time"))

print("Q1: Average Pit Stop Time")
avg_pit.show()

Q1: Average Pit Stop Time
+------+--------+------------------+
|raceId|driverId|      avg_pit_time|
+------+--------+------------------+
|   843|      39|           26049.0|
|   844|      67|21976.333333333332|
|   844|       2|          21743.75|
|   844|      20|          20402.25|
|   849|      17|           24416.0|
|   856|      20|           20077.5|
|   869|     819|           18606.0|
|   873|     818|           30198.5|
|   876|      20|           20707.0|
|   878|       3|           21346.0|
|   880|     154|           22302.0|
|   887|       8|           24710.0|
|   909|      18|           20247.0|
|   911|     822|           22972.5|
|   957|     837|           30066.5|
|   958|     822|           21554.5|
|   970|       8|           20569.5|
|   986|       1|           26299.0|
|  1009|     844|           21828.0|
|  1009|     154|           22354.0|
+------+--------+------------------+
only showing top 20 rows



In [0]:
# Q2: Rank by race winner
df_results_selected = df_results.select("raceId", "driverId", "positionOrder")

pit_result = avg_pit.join(df_results_selected, on=["raceId", "driverId"], how="inner")
pit_result_sorted = pit_result.orderBy(["raceId", "positionOrder"])
print("Q2: Average Pit Time Ranked by Position")
pit_result_sorted.show()


Q2: Average Pit Time Ranked by Position
+------+--------+------------+-------------+
|raceId|driverId|avg_pit_time|positionOrder|
+------+--------+------------+-------------+
|  1000|       1|     21480.0|            1|
|  1000|     154|     21733.0|           10|
|  1000|     843|     21831.0|           11|
|  1000|     807|     22308.0|           12|
|  1000|     839|     22258.0|           13|
|  1000|     815|     22561.0|           14|
|  1000|     828|     22640.0|           15|
|  1000|     845|     21509.0|           16|
|  1000|     840|     21291.0|           17|
|  1000|     838|     21732.0|           18|
|  1000|      20|     23111.0|            2|
|  1000|       8|     23150.0|            3|
|  1000|     817|     21364.0|            4|
|  1000|     822|     21337.0|            5|
|  1000|     842|     21684.0|            6|
|  1000|     825|     25126.0|            7|
|  1000|       4|     21795.0|            8|
|  1000|     832|     21914.0|            9|
|  1001|      2

In [0]:
# Q3: Fill missing driver codes
print("\nQ3: Fill missing driver codes")
print("\nQ3: Fill missing driver codes")
from pyspark.sql.functions import expr

# Fill missing 'code' with first 3 letters of surname
df_drivers = df_drivers.withColumn(
    "code",
    when(col("code").isNull(), expr("upper(substr(surname, 1, 3))"))
    .otherwise(col("code"))
)
df_drivers.select("driverId", "surname", "code").show(5)




Q3: Fill missing driver codes

Q3: Fill missing driver codes
+--------+----------+----+
|driverId|   surname|code|
+--------+----------+----+
|       1|  Hamilton| HAM|
|       2|  Heidfeld| HEI|
|       3|   Rosberg| ROS|
|       4|    Alonso| ALO|
|       5|Kovalainen| KOV|
+--------+----------+----+
only showing top 5 rows



In [0]:
# Q4: Youngest and oldest driver per race
df_drivers = df_drivers.withColumn("dob", to_date("dob"))
df_races = df_races.withColumn("date", to_date("date"))

df_age = df_results.join(df_drivers, "driverId") \
                 .join(df_races.select("raceId", "date"), "raceId") \
                 .withColumn("age", datediff(col("date"), col("dob")))

window_young = Window.partitionBy("raceId").orderBy(col("age").asc())
window_old = Window.partitionBy("raceId").orderBy(col("age").desc())

youngest = df_age.withColumn("rn", row_number().over(window_young)).filter("rn = 1")
oldest = df_age.withColumn("rn", row_number().over(window_old)).filter("rn = 1")

print("\nQ4: Youngest drivers per race")
youngest.select("raceId", "driverId", "age").show(5)

print("\nQ4: Oldest drivers per race")
oldest.select("raceId", "driverId", "age").show(5)



Q4: Youngest drivers per race
+------+--------+----+
|raceId|driverId| age|
+------+--------+----+
|     1|      67|7454|
|    10|     153|7065|
|   100|      32|7825|
|  1000|     840|7213|
|  1001|     840|7241|
+------+--------+----+
only showing top 5 rows


Q4: Oldest drivers per race
+------+--------+-----+
|raceId|driverId|  age|
+------+--------+-----+
|     1|      22|13459|
|    10|      22|13578|
|   100|      44|13827|
|  1000|       8|14165|
|  1001|       8|14193|
+------+--------+-----+
only showing top 5 rows



In [0]:
# Q5: Most wins (positionOrder == 1) and most last places
df_results = df_results.withColumn("positionOrder", col("positionOrder").cast("int"))
wins = df_results.filter(col("positionOrder") == 1) \
                .groupBy("driverId") \
                .count() \
                .orderBy(col("count").desc())
print("\nQ5: Drivers with most wins")
wins.show(5)

last_place_per_race = df_results.groupBy("raceId").agg(max("positionOrder").alias("last_pos"))
losses = df_results.join(last_place_per_race,
                         (df_results.raceId == last_place_per_race.raceId) &
                         (df_results.positionOrder == last_place_per_race.last_pos),
                         how="inner") \
                   .groupBy("driverId") \
                   .count() \
                   .orderBy(col("count").desc())
print("\nQ5: Drivers with most last-place finishes")
losses.show(5)


Q5: Drivers with most wins
+--------+-----+
|driverId|count|
+--------+-----+
|       1|  103|
|      30|   91|
|      20|   53|
|     117|   51|
|     830|   45|
+--------+-----+
only showing top 5 rows


Q5: Drivers with most last-place finishes
+--------+-----+
|driverId|count|
+--------+-----+
|     807|   18|
|       8|   16|
|     154|   15|
|      15|   14|
|       4|   14|
+--------+-----+
only showing top 5 rows



In [0]:
# Q6: Extra question: Average number of laps per driver
lap_counts = df_lap_times.groupBy("raceId", "driverId") \
                             .agg(countDistinct("lap").alias("num_laps"))
avg_laps = lap_counts.groupBy("driverId") \
                     .agg(avg("num_laps").alias("avg_laps"))
print("\nQ6: Average number of laps per driver")
avg_laps.show(5)



Q6: Average number of laps per driver
+--------+-----------------+
|driverId|         avg_laps|
+--------+-----------------+
|     829|51.44444444444444|
|     853|             50.4|
|       7|            51.04|
|      51|           41.875|
|      15|51.86610878661088|
+--------+-----------------+
only showing top 5 rows

