In [0]:
pit_stops_df = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True)
drivers_df = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True)

In [0]:
# 1. [10 pts] Calculate the average time each driver spent at the pit stop per race
from pyspark.sql import functions as F

avg_pit_stop_time = (
    pit_stops_df.groupBy("raceId", "driverId")
    .agg(F.avg("milliseconds").alias("avg_pit_stop_time"))
)
avg_pit_stop_time.show()

+------+--------+------------------+
|raceId|driverId| avg_pit_stop_time|
+------+--------+------------------+
|   843|      39|           26049.0|
|   844|      67|21976.333333333332|
|   844|       2|          21743.75|
|   844|      20|          20402.25|
|   849|      17|           24416.0|
|   856|      20|           20077.5|
|   869|     819|           18606.0|
|   873|     818|           30198.5|
|   876|      20|           20707.0|
|   878|       3|           21346.0|
|   880|     154|           22302.0|
|   887|       8|           24710.0|
|   909|      18|           20247.0|
|   911|     822|           22972.5|
|   957|     837|           30066.5|
|   958|     822|           21554.5|
|   970|       8|           20569.5|
|   986|       1|           26299.0|
|  1009|     844|           21828.0|
|  1009|     154|           22354.0|
+------+--------+------------------+
only showing top 20 rows



In [0]:
#2.  [20 pts] Rank the average pit stop time based on race winners
# Import necessary libraries
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Get the driver with the shortest pit stop time (potential winner) for each race
race_winners: DataFrame = (
    pit_stops_df.orderBy("raceId", "milliseconds")  # Sort by race and pit stop time
    .groupBy("raceId")
    .agg(F.first("driverId").alias("winner_driverId"))  # Get the driver with the fastest pit stop
)

# Rank the average pit stop time
avg_pit_stop_time: DataFrame = avg_pit_stop_time.withColumn(
    "Rank", F.rank().over(Window.partitionBy("raceId").orderBy("avg_pit_stop_time"))
)

# Handling Drivers Who Did Not Finish the Race:
# Drivers who did not finish may not have pit stop records.
# Using an INNER JOIN ensures we only include drivers who actually completed the race.
# If you want to include all drivers, you can change the join type to 'left'.

# Join the dataframes and display the result sorted by 'raceId' and 'Rank'
joined_df: DataFrame = avg_pit_stop_time.join(race_winners, on=["raceId"], how="inner")
display(joined_df.orderBy(["raceId", "Rank"]))

raceId,driverId,avg_pit_stop_time,Rank,winner_driverId
1000,840,21291.0,1,840
1000,822,21337.0,2,840
1000,817,21364.0,3,840
1000,1,21480.0,4,840
1000,845,21509.0,5,840
1000,842,21684.0,6,840
1000,838,21732.0,7,840
1000,154,21733.0,8,840
1000,4,21795.0,9,840
1000,843,21831.0,10,840


In [0]:
%python
# [20 pts] Insert missing code for drivers
from pyspark.sql import functions as F
from itertools import chain

# Create a dictionary from the DataFrame
code_map = dict(zip(
    drivers_df.select('surname').toPandas()['surname'], 
    drivers_df.select('code').toPandas()['code']
))

# Create a mapping DataFrame
mapping_expr = F.create_map([F.lit(x) for x in chain(*code_map.items())])

# Use PySpark `when` function to fill in missing values
drivers_df = drivers_df.withColumn(
    "code",
    F.when(F.col("code").isNotNull(), F.col("code"))  # Keep existing values
     .otherwise(mapping_expr[F.col("surname")])  # Fill missing using dictionary
)

# Show updated DataFrame
display(drivers_df)

driverId,driverRef,number,code,forename,surname,dob,nationality,url
1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,\N,NAK,Kazuki,Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,\N,BOU,Sébastien,Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7,RAI,Kimi,Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88,KUB,Robert,Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,\N,GLO,Timo,Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


In [0]:
# 4. [20 pts] Find the youngest and oldest driver per race
from pyspark.sql import functions as F
from pyspark.sql import Window

# Step 1: Add 'Age' column to 'drivers_with_age' DataFrame
# Assuming 'drivers_with_age' already has 'raceId', 'driverId', 'dob', 'date'
drivers_with_age = drivers_with_age.withColumn(
    "Age", F.year("date") - F.year("dob")
)

# Step 2: Define the Window Specification
window_spec = Window.partitionBy("raceId").orderBy("Age")

# Step 3: Identify the youngest and oldest drivers
# Using rank() to properly handle ties
youngest = drivers_with_age.withColumn("youngest_rank", F.rank().over(window_spec)) \
                           .filter(F.col("youngest_rank") == 1) \
                           .select("raceId", "driverId", "Age")

oldest = drivers_with_age.withColumn("oldest_rank", F.rank().over(window_spec.orderBy(F.col("Age").desc()))) \
                         .filter(F.col("oldest_rank") == 1) \
                         .select("raceId", "driverId", "Age")

# Step 4: Display the Results
display(youngest.orderBy("raceId"))
display(oldest.orderBy("raceId"))


raceId,driverId,Age
1,67,21
2,67,21
3,67,21
4,67,21
5,67,21
6,67,21
7,67,21
8,67,21
9,67,21
10,153,19


raceId,driverId,Age
1,22,37
2,22,37
3,22,37
4,22,37
5,22,37
6,22,37
7,22,37
8,22,37
9,22,37
10,22,37


In [0]:
# 5. For a given race, which driver has the most wins and losses?
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Combine results with race dates
results_with_dates = results_df.join(races_df, on="raceId", how="inner") \
                               .select("raceId", "driverId", "positionOrder", "date")

# Step 2: Define a window to calculate cumulative wins & losses before each race
window_spec = Window.partitionBy("driverId").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Step 3: Calculate cumulative wins and losses for each driver before each race
driver_stats = results_with_dates.withColumn(
    "Cumulative_Wins",
    F.sum(F.when(F.col("positionOrder") == 1, 1).otherwise(0)).over(window_spec)
).withColumn(
    "Cumulative_Losses",
    F.sum(F.when((F.col("positionOrder") != 1) & (F.col("positionOrder").isNotNull()), 1).otherwise(0)).over(window_spec)
)

# Step 4: For each race, find the driver with the most cumulative wins and most cumulative losses
# Using aliases to differentiate between DataFrames
driver_stats_alias = driver_stats.alias("ds")

most_wins_per_race = driver_stats.groupBy("raceId").agg(
    F.max("Cumulative_Wins").alias("Max_Wins")
).alias("mw").join(driver_stats_alias,
                   (driver_stats_alias["Cumulative_Wins"] == F.col("mw.Max_Wins")) & 
                   (driver_stats_alias["raceId"] == F.col("mw.raceId"))
).select("ds.raceId", "ds.driverId", "ds.Cumulative_Wins", "ds.Cumulative_Losses")

most_losses_per_race = driver_stats.groupBy("raceId").agg(
    F.max("Cumulative_Losses").alias("Max_Losses")
).alias("ml").join(driver_stats_alias,
                   (driver_stats_alias["Cumulative_Losses"] == F.col("ml.Max_Losses")) & 
                   (driver_stats_alias["raceId"] == F.col("ml.raceId"))
).select("ds.raceId", "ds.driverId", "ds.Cumulative_Wins", "ds.Cumulative_Losses")

# Step 5: Display the Results
display(most_wins_per_race.orderBy("raceId"))
display(most_losses_per_race.orderBy("raceId"))


raceId,driverId,Cumulative_Wins,Cumulative_Losses
1,4,21,103
2,4,21,104
3,4,21,105
4,4,21,106
5,4,21,107
6,4,21,108
7,4,21,109
8,4,21,110
9,4,21,111
10,4,21,112


raceId,driverId,Cumulative_Wins,Cumulative_Losses
1,22,9,263
2,22,9,264
3,22,9,265
4,22,9,266
5,22,9,267
6,22,9,268
7,22,9,269
8,22,9,270
9,22,9,271
10,22,9,272


In [0]:
#6. [10 pts] Continue exploring the data by answering your own question.
# How many races has each driver participated in?
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Initialize Spark session
spark = SparkSession.builder.appName("F1Analysis").getOrCreate()

# Load race results dataset
results_df = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)

# Count races per driver
driver_race_count = results_df.groupBy("driverId").agg(
    count("raceId").alias("total_races")
)

# Show drivers with the most and least race participations
print("Top 10 Drivers with Most Race Participations:")
driver_race_count.orderBy("total_races", ascending=False).show(10)

print("Top 10 Drivers with Least Race Participations:")
driver_race_count.orderBy("total_races", ascending=True).show(10)


Top 10 Drivers with Most Race Participations:
+--------+-----------+
|driverId|total_races|
+--------+-----------+
|       4|        370|
|       8|        352|
|      22|        326|
|       1|        322|
|      18|        309|
|      30|        308|
|      20|        300|
|      13|        271|
|     119|        257|
|      15|        256|
+--------+-----------+
only showing top 10 rows

Top 10 Drivers with Least Race Participations:
+--------+-----------+
|driverId|total_races|
+--------+-----------+
|      28|          1|
|     707|          1|
|     497|          1|
|     737|          1|
|     587|          1|
|     451|          1|
|     384|          1|
|     796|          1|
|     743|          1|
|     472|          1|
+--------+-----------+
only showing top 10 rows

