1. [`10 pts`] What was the average time each driver spent at the pit stop for each race?
2. [`20 pts`] Rank the average time spent at the pit stop in order of who won each race
3. [`20 pts`] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset
4. [`20 pts`] Who is the youngest and oldest driver for each race? Create a new column called “Age”
5. [`20 pts`] For a given race, which driver has the most wins and losses?
6. [`10 pts`] Continue exploring the data by answering your own question.[](url)

In [0]:
# import relevant functions
from pyspark.sql import functions as F
from pyspark.sql.functions import avg
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [0]:
# import data
circuits = spark.read.csv('s3://columbia-gr5069-main/raw/circuits.csv', header=True) 
constructor_results = spark.read.csv('s3://columbia-gr5069-main/raw/constructor_results.csv', header=True) 
constructor_standings = spark.read.csv('s3://columbia-gr5069-main/raw/constructor_standings.csv', header=True) 
constructors = spark.read.csv('s3://columbia-gr5069-main/raw/constructors.csv', header=True) 
driver_standings = spark.read.csv('s3://columbia-gr5069-main/raw/driver_standings.csv', header=True) 
drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True) 
lap_times = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True) 
pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True) 
qualifying = spark.read.csv('s3://columbia-gr5069-main/raw/qualifying.csv', header=True) 
races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True) 
results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True) 
seasons = spark.read.csv('s3://columbia-gr5069-main/raw/seasons.csv', header=True) 
sprint_results = spark.read.csv('s3://columbia-gr5069-main/raw/sprint_results.csv', header=True) 
status = spark.read.csv('s3://columbia-gr5069-main/raw/status.csv', header=True) 
winequality = spark.read.csv('s3://columbia-gr5069-main/raw/winequality-red_main.csv', header=True) 

In [0]:
# 1. [`10 pts`] What was the average time each driver spent at the pit stop for each race?

#create new column with full name of drivers 
drivers = drivers.withColumn("full_name", F.concat_ws(" ", drivers["forename"], drivers["surname"]))

#group by race id and driver id and calculate average duration of pit stop
avg_times = pit_stops.groupby(["raceId", "driverId"]).agg(avg("duration"))

#join with drivers_combined table to get names of drivers and their average duration of pit stop
avg_times = avg_times.join(drivers.select("driverId", "full_name"), "driverId", "left_outer")

display(avg_times)

In [0]:
# 2. [`20 pts`] Rank the average time spent at the pit stop in order of who won each race

# add position column to avg_times dataframe using race ID and driver ID
avg_times = avg_times.join(results.select("position", "raceId", "driverId"), on=["raceId", "driverId"], how="left_outer")

# sort by race and then by position (those who do not finish the race are still grouped with the race they participated in but are at the bottom of their section)
display(avg_times.sort("raceId", F.asc("position")))


In [0]:
# 3. [`20 pts`] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset

# codes (first three letters of surname capitalized) added to drivers dataframe
drivers = drivers.withColumn("code", F.upper(F.substring("surname", 1, 3)))

display(drivers)

In [0]:
# 4. [`20 pts`] Who is the youngest and oldest driver for each race? Create a new column called “Age”

# add driver date of birth to dataframe
avg_times = avg_times.join(drivers.select("driverId", "dob"), on="driverId")

# add date of race to dataframe
avg_times = avg_times.join(races.select("raceId", "date"), on="raceId")

# take difference between date of race and date of driver's birth and divide by 365 to get years
avg_times = avg_times.withColumn("Age", F.datediff(avg_times.date, avg_times.dob)/365)

# turn age value into integer to get whole number for age of driver in years
avg_times = avg_times.withColumn("Age", avg_times["Age"].cast(IntegerType()))
display(avg_times)

# create new dataframe with youngest driver (name and age) and oldest driver (name and age) for each race
race_ages = avg_times.groupBy("raceId").agg(F.min(F.struct("Age", "full_name")).alias("youngest"), F.max(F.struct("Age", "full_name")).alias("oldest"))
display(race_ages)


In [0]:
# 5. [`20 pts`] For a given race, which driver has the most wins and losses?
# on a given race, provide a count of how many times a diver has won previous races, and a count of all the times a driver has not won (but completed) a race

# combine race ID, date of race, driver ID, position that driver got on race, and driver name into one dataframe and sort by date
race_dates = races.select("raceId", "date")
driver_df = results.select("raceId", "driverId", "position").join(drivers.select("driverId", "full_name"), on="driverId")
sorted_races = race_dates.join(driver_df, on="raceId", how="left").sort("date", ascending=True)
display(sorted_races)

In [0]:
def func(race, driver, sorted_races):
    """
    function that takes in race ID, driver ID, and sorted dataframe and returns the number of wins and number of completions 
    for a given driver on a given race

    Inputs:
        race: "raceId"
        driver: "driverId" 
        sorted_races: dataframe that contains race ID, date of race, driver ID, position that driver got on race, and driver name

    Outputs:
        win_condition: number of previous wins for specified driver on specified race
        completion_condition: number of previous completions for specified driver on specified race
    """

    # get date of given race using race ID
    race_date = sorted_races.filter(col("raceId") == race).first()["date"]
    
    # select all races that come before date of given race
    filtered = sorted_races.filter(col("date") < race_date)

    # select for specific driver in filtered dataframe
    filtered_driver = filtered.filter(col("driverId") == driver)

    # get counts of wins for specified driver
    win_condition = filtered_driver.filter(col("position") == 1).count()

    # get counts for completions (but not wins or incompletes) for specified driver
    completed_condition = filtered_driver.filter(col("position") > 1).count()

    # output values for number of wins and number of completions
    return win_condition, completed_condition


In [0]:
def apply_func_to_multiple_drivers(race, sorted_races):
    """
    function that applies the "func" functions (see above) to all drivers in a given race

    Inputs:
        race: "raceId"
        sorted_races: dataframe that contains race ID, date of race, driver ID, position that driver got on race, and driver name

    Outputs:
        driver_performance: dictionary that contains the number of previous wins and completions for each driver in a given race
    """

    # get all drivers that participated in given race
    drivers_in_race = sorted_races.filter(col("raceId") == race).select("driverId").distinct().collect()

    # create dictionary to store the results for each driver
    driver_performance = {}

    # apply win and completion function to each driver
    for driver_row in drivers_in_race:
        driver_id = driver_row["driverId"]
        
        #call func for the specific race and driver
        win_condition, completed_condition = func(race, driver_id, sorted_races)
        
        #store results in the dictionary
        driver_performance[driver_id] = {
            "win_condition": win_condition,
            "completed_condition": completed_condition
        }

    # output previous wins and completions for each driver in given race
    return driver_performance

In [0]:
# use function to see previous wins and completions for each driver in race 536
apply_func_to_multiple_drivers(536, sorted_races)