In [0]:
# Import necessary PySpark SQL functions and Window functionality
# - col: For column references and operations
# - year, current_date: For date manipulations
# - avg, min, max, count: Aggregate functions for statistical analysis
# - when: For conditional logic (similar to IF-THEN-ELSE)
# - rank, dense_rank: Window functions for ranking results
# - Window: For defining partitioning and ordering in window operations
from pyspark.sql.functions import col, year, current_date, avg, min, max, count, when, rank, dense_rank, lit, concat, datediff, substring, upper, stddev, sum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DateType

In [0]:
#Read in the two datasets that are necesary for this take home assignment
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)
df_status = spark.read.csv('s3://columbia-gr5069-main/raw/status.csv', header=True)
df_qualifying = spark.read.csv("s3://columbia-gr5069-main/raw/qualifying.csv", header=True, inferSchema=True)



In [0]:
#Observing each table 

print("Lap Times Schema:")
df_laptimes.printSchema()

print("Drivers Schema:")
df_drivers.printSchema()

print("Races Schema:")
df_races.printSchema()

print("Results Schema:")
df_results.printSchema()

print("Pit Stops Schema:")
df_pit_stops.printSchema()

print("Status Schema:")
df_status.printSchema()

print("Qualifying Schema:")
df_qualifying.printSchema()

# Display a sample of each dataset
display(df_laptimes.limit(3))
display(df_drivers.limit(3))
display(df_races.limit(3))
display(df_results.limit(3))
display(df_pit_stops.limit(3))
display(df_status.limit(3))
display(df_qualifying.limit(3))

Lap Times Schema:
root
 |-- raceId: string (nullable = true)
 |-- driverId: string (nullable = true)
 |-- lap: string (nullable = true)
 |-- position: string (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: string (nullable = true)

Drivers Schema:
root
 |-- driverId: string (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- number: string (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)

Races Schema:
root
 |-- raceId: string (nullable = true)
 |-- year: string (nullable = true)
 |-- round: string (nullable = true)
 |-- circuitId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- url: string (nullable = true)
 |-- fp1_date: string (nullable = true)
 |-- fp1_time: strin

raceId,driverId,lap,position,time,milliseconds
841,20,1,1,1:38.109,98109
841,20,2,1,1:33.006,93006
841,20,3,1,1:32.713,92713


driverId,driverRef,number,code,forename,surname,dob,nationality,url
1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg


raceId,year,round,circuitId,name,date,time,url,fp1_date,fp1_time,fp2_date,fp2_time,fp3_date,fp3_time,quali_date,quali_time,sprint_date,sprint_time
1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00,http://en.wikipedia.org/wiki/2009_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00,http://en.wikipedia.org/wiki/2009_Malaysian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00,http://en.wikipedia.org/wiki/2009_Chinese_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N


resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
1,18,1,1,22,1,1,1,1,10,58,1:34:50.616,5690616,39,2,1:27.452,218.3,1
2,18,2,2,3,5,2,2,2,8,58,+5.478,5696094,41,3,1:27.739,217.586,1
3,18,3,3,7,7,3,3,3,6,58,+8.163,5698779,41,5,1:28.090,216.719,1


raceId,driverId,stop,lap,time,duration,milliseconds
841,153,1,1,17:05:23,26.898,26898
841,30,1,1,17:05:52,25.021,25021
841,17,1,11,17:20:48,23.426,23426


statusId,status
1,Finished
2,Disqualified
3,Accident


qualifyId,raceId,driverId,constructorId,number,position,q1,q2,q3
1,18,1,1,22,1,1:26.572,1:25.187,1:26.714
2,18,9,2,4,2,1:26.103,1:25.315,1:26.869
3,18,5,1,23,3,1:25.664,1:25.452,1:27.079


In [0]:
#Question 1: What was the average time each driver spent at the pit stop for each race?

# Calculate the average pit stop time per driver per race
avg_pit_times = df_pit_stops.groupBy("raceId", "driverId") \
                           .agg(avg("milliseconds").alias("avg_pit_time_ms")) \
                           .withColumn("avg_pit_time_seconds", col("avg_pit_time_ms") / 1000)

# Join with driver names for better readability
pit_stops_with_names = avg_pit_times.join(
    df_drivers.select("driverId", "forename", "surname"),
    on="driverId"
).withColumn("driver_name", concat(col("forename"), lit(" "), col("surname")))

# Join with race names
final_result = pit_stops_with_names.join(
    df_races.select("raceId", "name", "year"),
    on="raceId"
)

# Select and order the columns for display
q1_result = final_result.select(
    "year", "name", "driver_name", "avg_pit_time_seconds"
).orderBy("year", "name", "avg_pit_time_seconds")

# Display the results
display(q1_result)


year,name,driver_name,avg_pit_time_seconds
2011,Abu Dhabi Grand Prix,Pastor Maldonado,16.549
2011,Abu Dhabi Grand Prix,Bruno Senna,18.057
2011,Abu Dhabi Grand Prix,Lewis Hamilton,19.3945
2011,Abu Dhabi Grand Prix,Nico Rosberg,19.846
2011,Abu Dhabi Grand Prix,Michael Schumacher,20.149
2011,Abu Dhabi Grand Prix,Paul di Resta,20.216
2011,Abu Dhabi Grand Prix,Felipe Massa,20.253
2011,Abu Dhabi Grand Prix,Vitaly Petrov,20.353
2011,Abu Dhabi Grand Prix,Adrian Sutil,20.485
2011,Abu Dhabi Grand Prix,Jenson Button,20.6045


In [0]:
#Rank the average time spent at the pit stop in order of who won each race

# Question 2: Rank the average time spent at the pit stop in order of who won each race

from pyspark.sql.functions import col, avg, concat, lit, rank
from pyspark.sql.window import Window

# Identify race winners (position = 1)
race_winners = df_results.filter(col("position") == 1) \
                        .select("raceId", "driverId") \
                        .withColumnRenamed("driverId", "winner_id")

# Calculate average pit stop time for each driver in each race
avg_pit_times = df_pit_stops.groupBy("raceId", "driverId") \
                           .agg(avg("milliseconds").alias("avg_pit_time_ms")) \
                           .withColumn("avg_pit_time_seconds", col("avg_pit_time_ms") / 1000)

# Join with race winners data
pit_times_with_winners = avg_pit_times.join(race_winners, on="raceId")

# Join with driver information for names
pit_times_with_names = pit_times_with_winners.join(
    df_drivers.select("driverId", "forename", "surname"),
    on="driverId"
).withColumn("driver_name", concat(col("forename"), lit(" "), col("surname")))

# Create a window function to rank pit stop times within each race
window_spec = Window.partitionBy("raceId").orderBy("avg_pit_time_ms")
ranked_pit_times = pit_times_with_names.withColumn("pit_time_rank", rank().over(window_spec))

# Mark the race winner
ranked_pit_times = ranked_pit_times.withColumn(
    "is_winner", 
    when(col("driverId") == col("winner_id"), lit("Race Winner")).otherwise(lit(""))
)

# Join with race information
final_result = ranked_pit_times.join(
    df_races.select("raceId", "name", "year"),
    on="raceId"
)

# Select and order columns for display
q2_result = final_result.select(
    "year", 
    "name", 
    "driver_name", 
    "avg_pit_time_seconds", 
    "pit_time_rank", 
    "is_winner"
).orderBy("year", "name", "pit_time_rank")

# Display the results
display(q2_result)

# Add a note about drivers who didn't finish or make pit stops
print("Note: Drivers who didn't finish the race or didn't make pit stops are not included in this analysis.")

year,name,driver_name,avg_pit_time_seconds,pit_time_rank,is_winner
2011,Abu Dhabi Grand Prix,Pastor Maldonado,16.549,1,
2011,Abu Dhabi Grand Prix,Bruno Senna,18.057,2,
2011,Abu Dhabi Grand Prix,Lewis Hamilton,19.3945,3,Race Winner
2011,Abu Dhabi Grand Prix,Nico Rosberg,19.846,4,
2011,Abu Dhabi Grand Prix,Michael Schumacher,20.149,5,
2011,Abu Dhabi Grand Prix,Paul di Resta,20.216,6,
2011,Abu Dhabi Grand Prix,Felipe Massa,20.253,7,
2011,Abu Dhabi Grand Prix,Vitaly Petrov,20.353,8,
2011,Abu Dhabi Grand Prix,Adrian Sutil,20.485,9,
2011,Abu Dhabi Grand Prix,Jenson Button,20.6045,10,


Note: Drivers who didn't finish the race or didn't make pit stops are not included in this analysis.


In [0]:
# Question 3: Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

# Question 3: Insert the missing code for drivers based on the 'drivers' dataset

from pyspark.sql.functions import col, upper, substring, when, lit

# Check for drivers with missing codes 
# Let's try a different approach to identify missing codes
missing_codes = df_drivers.filter(
    (col("code").isNull()) | 
    (col("code") == "") | 
    (col("code") == "\\N") |  # This might be how NULL is represented in your data
    (col("code") == "N")
)

# Display drivers with missing codes
print("Drivers with missing codes:")
display(missing_codes)

# Count how many drivers are missing codes
missing_code_count = missing_codes.count()
print(f"Number of drivers missing codes: {missing_code_count}")

# Generate three-letter codes for ALL drivers
df_drivers_with_codes = df_drivers.withColumn(
    "generated_code",
    when((col("code").isNull()) | (col("code") == "") | (col("code") == "\\N") | (col("code") == "N"),
         upper(substring(col("surname"), 1, 3))
    ).otherwise(col("code"))
)

# Display all drivers with their original and generated codes
display(df_drivers_with_codes.select(
    "driverId", 
    "forename", 
    "surname", 
    "code", 
    "generated_code"
).orderBy("driverId"))

# Now let's specifically check which drivers got NEW codes
drivers_with_new_codes = df_drivers_with_codes.filter(
    col("code") != col("generated_code")
)

print("Drivers who received newly generated codes:")
display(drivers_with_new_codes.select(
    "driverId", 
    "forename", 
    "surname", 
    "code", 
    "generated_code"
).orderBy("driverId"))

Drivers with missing codes:


driverId,driverRef,number,code,forename,surname,dob,nationality,url
43,matta,\N,\N,Cristiano,da Matta,1973-09-19,Brazilian,http://en.wikipedia.org/wiki/Cristiano_da_Matta
44,panis,\N,\N,Olivier,Panis,1966-09-02,French,http://en.wikipedia.org/wiki/Olivier_Panis
45,pantano,\N,\N,Giorgio,Pantano,1979-02-04,Italian,http://en.wikipedia.org/wiki/Giorgio_Pantano
46,bruni,\N,\N,Gianmaria,Bruni,1981-05-30,Italian,http://en.wikipedia.org/wiki/Gianmaria_Bruni
47,baumgartner,\N,\N,Zsolt,Baumgartner,1981-01-01,Hungarian,http://en.wikipedia.org/wiki/Zsolt_Baumgartner
48,gene,\N,\N,Marc,Gené,1974-03-29,Spanish,http://en.wikipedia.org/wiki/Marc_Gen%C3%A9
49,frentzen,\N,\N,Heinz-Harald,Frentzen,1967-05-18,German,http://en.wikipedia.org/wiki/Heinz-Harald_Frentzen
50,verstappen,\N,\N,Jos,Verstappen,1972-03-04,Dutch,http://en.wikipedia.org/wiki/Jos_Verstappen
51,wilson,\N,\N,Justin,Wilson,1978-07-31,British,http://en.wikipedia.org/wiki/Justin_Wilson_(racing_driver)
52,firman,\N,\N,Ralph,Firman,1975-05-20,Irish,http://en.wikipedia.org/wiki/Ralph_Firman


Number of drivers missing codes: 757


driverId,forename,surname,code,generated_code
1,Lewis,Hamilton,HAM,HAM
10,Timo,Glock,GLO,GLO
100,Érik,Comas,\N,COM
101,David,Brabham,\N,BRA
102,Ayrton,Senna,\N,SEN
103,Éric,Bernard,\N,BER
104,Christian,Fittipaldi,\N,FIT
105,Michele,Alboreto,\N,ALB
106,Olivier,Beretta,\N,BER
107,Roland,Ratzenberger,\N,RAT


Drivers who received newly generated codes:


driverId,forename,surname,code,generated_code
100,Érik,Comas,\N,COM
101,David,Brabham,\N,BRA
102,Ayrton,Senna,\N,SEN
103,Éric,Bernard,\N,BER
104,Christian,Fittipaldi,\N,FIT
105,Michele,Alboreto,\N,ALB
106,Olivier,Beretta,\N,BER
107,Roland,Ratzenberger,\N,RAT
108,Paul,Belmondo,\N,BEL
109,Jyrki,Järvilehto,\N,JÄR


In [0]:
# Question 4: Who is the youngest and oldest driver for each race? Create a new column called “Age”

from pyspark.sql.functions import col, datediff, year, lit, concat, rank
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

# First, get all driver-race combinations from results
race_participants = df_results.select("raceId", "driverId").distinct()

# Get race dates from the races table
race_dates = df_races.select("raceId", "date", "name", "year")

# Join races and drivers to calculate age at race time
driver_ages = race_participants.join(race_dates, on="raceId") \
                              .join(df_drivers.select("driverId", "dob", "forename", "surname"), on="driverId") \
                              .withColumn("race_date", col("date").cast("date")) \
                              .withColumn("birth_date", col("dob").cast("date")) \
                              .withColumn("age_years", (datediff(col("race_date"), col("birth_date")) / 365.25).cast(IntegerType())) \
                              .withColumn("driver_name", concat(col("forename"), lit(" "), col("surname")))

# Define windows for finding youngest and oldest drivers per race
window_youngest = Window.partitionBy("raceId").orderBy("age_years")
window_oldest = Window.partitionBy("raceId").orderBy(col("age_years").desc())

# Find youngest driver for each race
youngest_drivers = driver_ages.withColumn("rank", rank().over(window_youngest)) \
                             .filter(col("rank") == 1) \
                             .select("raceId", "driver_name", "age_years") \
                             .withColumnRenamed("driver_name", "youngest_driver") \
                             .withColumnRenamed("age_years", "youngest_age")

# Find oldest driver for each race
oldest_drivers = driver_ages.withColumn("rank", rank().over(window_oldest)) \
                           .filter(col("rank") == 1) \
                           .select("raceId", "driver_name", "age_years") \
                           .withColumnRenamed("driver_name", "oldest_driver") \
                           .withColumnRenamed("age_years", "oldest_age")

# Combine youngest and oldest driver information
age_results = youngest_drivers.join(oldest_drivers, on="raceId") \
                             .join(race_dates, on="raceId") \
                             .select("year", "name", "youngest_driver", "youngest_age", "oldest_driver", "oldest_age")

# Display the results sorted by year and race name
display(age_results.orderBy("year", "name"))

# Also save the original driver_ages DataFrame which contains the "Age" column for all drivers
display(driver_ages.select("raceId", "name", "year", "driver_name", "age_years").orderBy("year", "name", "driver_name"))

# Add explanation of the age calculation approach
"""
Age Calculation Approach:
1. Created a new "age_years" column that calculates each driver's age at the time of the race
2. Age is calculated by finding the difference between race date and birth date in days
3. Divided this difference by 365.25 to convert to years (accounting for leap years)
4. For each race, identified both the youngest and oldest driver
5. This approach gives an accurate age count based on how many birthdays each driver had experienced by race day
"""

year,name,youngest_driver,youngest_age,oldest_driver,oldest_age
1950,Belgian Grand Prix,Geoff Crossley,29,Philippe Étancelin,53
1950,British Grand Prix,Geoff Crossley,29,Philippe Étancelin,53
1950,French Grand Prix,José Froilán González,27,Philippe Étancelin,53
1950,Indianapolis 500,Jimmy Davies,20,Mauri Rose,44
1950,Indianapolis 500,Troy Ruttman,20,Mauri Rose,44
1950,Italian Grand Prix,Alberto Ascari,32,Philippe Étancelin,53
1950,Italian Grand Prix,Maurice Trintignant,32,Philippe Étancelin,53
1950,Monaco Grand Prix,José Froilán González,27,Philippe Étancelin,53
1950,Swiss Grand Prix,Harry Schell,28,Philippe Étancelin,53
1951,Belgian Grand Prix,Alberto Ascari,32,Philippe Étancelin,54


raceId,name,year,driver_name,age_years
837,Belgian Grand Prix,1950,Alberto Ascari,31
837,Belgian Grand Prix,1950,Eugène Chaboud,43
837,Belgian Grand Prix,1950,Geoff Crossley,29
837,Belgian Grand Prix,1950,Johnny Claes,33
837,Belgian Grand Prix,1950,Juan Fangio,38
837,Belgian Grand Prix,1950,Louis Rosier,44
837,Belgian Grand Prix,1950,Luigi Fagioli,52
837,Belgian Grand Prix,1950,Luigi Villoresi,41
837,Belgian Grand Prix,1950,Nino Farina,43
837,Belgian Grand Prix,1950,Philippe Étancelin,53


'\nAge Calculation Approach:\n1. Created a new "age_years" column that calculates each driver\'s age at the time of the race\n2. Age is calculated by finding the difference between race date and birth date in days\n3. Divided this difference by 365.25 to convert to years (accounting for leap years)\n4. For each race, identified both the youngest and oldest driver\n5. This approach gives an accurate age count based on how many birthdays each driver had experienced by race day\n'

In [0]:
# Question 6 (my own) : Which nationality is most common for the drivers? 

# Question 6: Which nationality is the most common among F1 drivers?

from pyspark.sql.functions import count, desc

# Count the number of drivers by nationality
nationality_counts = df_drivers.groupBy("nationality") \
                              .agg(count("driverId").alias("driver_count"))

# Sort by count in descending order to see most common nationalities
sorted_nationalities = nationality_counts.orderBy(desc("driver_count"))

# Display the results
display(sorted_nationalities)

# Calculate percentage of total for each nationality
total_drivers = df_drivers.count()
nationality_percentages = nationality_counts.withColumn(
    "percentage", 
    (col("driver_count") / total_drivers * 100).cast("decimal(5,2)")
)

# Sort by percentage in descending order
sorted_percentages = nationality_percentages.orderBy(desc("percentage"))

# Display the results with percentages
display(sorted_percentages)

# Create a visualization showing the top 10 nationalities
top_10_nationalities = sorted_nationalities.limit(10)
display(top_10_nationalities)

# Add explanation
"""
Nationality Analysis:
1. Counted the number of drivers from each nationality
2. Calculated what percentage of total F1 drivers each nationality represents
3. Identified the most common nationality in Formula 1 history
4. This analysis helps understand which countries have contributed the most drivers to F1
"""

nationality,driver_count
British,165
American,158
Italian,99
French,73
German,50
Brazilian,32
Argentine,24
Swiss,23
South African,23
Belgian,23


nationality,driver_count,percentage
British,165,19.25
American,158,18.44
Italian,99,11.55
French,73,8.52
German,50,5.83
Brazilian,32,3.73
Argentine,24,2.8
Swiss,23,2.68
South African,23,2.68
Belgian,23,2.68


nationality,driver_count
British,165
American,158
Italian,99
French,73
German,50
Brazilian,32
Argentine,24
Swiss,23
South African,23
Belgian,23


'\nNationality Analysis:\n1. Counted the number of drivers from each nationality\n2. Calculated what percentage of total F1 drivers each nationality represents\n3. Identified the most common nationality in Formula 1 history\n4. This analysis helps understand which countries have contributed the most drivers to F1\n'