In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import avg, current_date, col, year, date_diff,floor, count
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import min, max, sum, lit, desc, asc
from pyspark.sql.functions import col, upper, substring, when
     


In [0]:
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv',header=True)
df_pitstops.display()

###Q1:[10 pts] What was the average time each driver spent at the pit stop for each race?


In [0]:
# Cast columns to FloatType
df_pitstops = df_pitstops.withColumn('milliseconds', df_pitstops['milliseconds'].cast(FloatType()))
display(df_pitstops)

In [0]:

avg_pitstop_time = df_pitstops.groupBy('driverId','raceId').agg(avg(col('milliseconds')).alias('avg_pitstop_time'))
avg_pitstop_time.display()
#see the values when driverID==1 and raceID=1072 and 1078

###Q2:[20 pts] Rank the average time spent at the pit stop in order of who won each race

In [0]:
race_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
display(race_results)

In [0]:
result_driver = race_results.withColumn('win_position', race_results.position).select('raceId','driverId','win_position').distinct()
pitstop_results_df = avg_pitstop_time.join(result_driver, on=['raceId','driverId'], how='inner').orderBy('win_position')
display(pitstop_results_df)

If a driver didn't finish, they will be assigned NA. 'win_position' = null/none for NA

In [0]:
#overall ranking by pit stop times
window_spec = Window.orderBy('avg_pitstop_time')  #ordering  by time in pitstop
# ranking across ALL drivers for their pit stop times. 
ranked_pitstop_results_df = (pitstop_results_df
    .withColumn('overall_pit_rank', F.row_number().over(window_spec))
    .orderBy('win_position')  # Final sort by race finishing order
)

# Handle Did Not Finish drivers (assuming 'win_position' = null/none for NA)
ranked_pitstop_results_df = ranked_pitstop_results_df.withColumn(
    'race_position',
    F.when(F.col('win_position').isNull(), 'NA')
     .otherwise(F.col('win_position')))


# Display the result
display(ranked_pitstop_results_df)

###3.[20 pts] Insert the missing code (e.g: ALO for Alonso) for drivers based on the 'drivers' dataset


In [0]:
df_drivers= spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', 
                            header=True)
display(df_drivers)

In [0]:
%python
# Update the 'code' column
df_drivers = df_drivers.withColumn('code', upper(substring(col('surname'), 1, 3)))

# Create the 'code_complete' column
df_drivers = df_drivers.withColumn('code_complete', when(col('code') == '/N', 'N').otherwise('Y'))

# Display the result
display(df_drivers)

###4.[20 pts] Who is the youngest and oldest driver for each race? Create a new column called “Age”


In [0]:
#First i will calculate the age using drivers dataframe and the column dob
from pyspark.sql.functions import current_date, datediff, col

df_drivers = df_drivers.withColumn('age', (datediff(current_date(), col('dob')) / 365.25).cast('int'))

# Display the result
display(df_drivers)

#I could have divided by 365 but the 0.25 takes leap years into account

In [0]:
#Joining with race results for raceID
race_results = race_results.join(df_drivers, on='driverId', how='inner')
display(race_results)

In [0]:
#For each race ID, creating a df with oldest driver and youngest drivers' name and age
oldest_youngest_df = race_results.groupBy('raceId').agg(F.max(F.col('age')).alias('oldest_age'), F.max(F.col('surname')).alias('oldest_surname'), F.min(F.col('age')).alias('youngest_age'), F.min(F.col('surname')).alias('youngest_surname'))
display(oldest_youngest_df)
     

###5.[20 pts] For a given race, which driver has the most wins and losses?

In [0]:

#calculating which driver has most wins or loses for a race_id from results_df
# Define window for driver's career up to CURRENT race
window_spec = (Window
    .partitionBy('driverId')
    .orderBy('raceId')
    .rowsBetween(Window.unboundedPreceding, -1)  # Exclude current race-- counting wins for all races preceeding current race! (for cumulative count)
)
#creating column total_wins for all wins before current race (basically, this is the cumulative win count). same for losses.
wins_losses_df = pitstop_results_df.withColumn(
    'total_wins', 
    F.sum(F.when(F.col('win_position') == 1, 1).otherwise(0)).over(window_spec) #adding 1 to total_wins everytime win_positon=1
).withColumn(
    'career_losses',
    F.sum(
        F.when(
            (F.col('win_position') > 1) &  # Finished race but loss!
            (F.col('win_position').isNotNull()),  # When race wasn't finished, we don't want it in loss count!-- Exclude NAs
            1
        ).otherwise(0)
    ).over(window_spec)
)
display(wins_losses_df)

###6.[10 pts] Continue exploring the data by answering your own question.
My Question:Which drivers have won the most races overall? I want to see the best performers

In [0]:
#fetch df_standings
df_standings = spark.read.csv('s3://columbia-gr5069-main/raw/driver_standings.csv', header=True)

In [0]:
#combine df_standings and df_drivers
df_standings = df_standings.join(df_drivers, on='driverId', how='inner')
display(df_standings)

In [0]:
df_sum_win = df_standings.groupBy('driverId','surname').agg(sum('wins').alias('wins_count'))
# want descending order
df_sum_win = df_sum_win.orderBy(col('wins_count').desc())
display(df_sum_win)