Q1

In [0]:
# Read all datasets
df_laptimes = spark.read.csv("/FileStore/tables/lap_times.csv", header=True, inferSchema=True)
df_drivers = spark.read.csv("/FileStore/tables/drivers.csv", header=True, inferSchema=True)
df_pitstops = spark.read.csv("/FileStore/tables/pit_stops.csv", header=True, inferSchema=True)
df_results = spark.read.csv("/FileStore/tables/results.csv", header=True, inferSchema=True)
df_races = spark.read.csv("/FileStore/tables/races.csv", header=True, inferSchema=True)


In [0]:
from pyspark.sql.functions import avg, round as spark_round

# Calculate average pit stop duration by driver and race
df_avg_pitstop = df_pitstops.groupBy("raceId", "driverId") \
    .agg(spark_round(avg("duration"), 2).alias("avg_pit_duration"))

df_avg_pitstop.show(10)


Q2

In [0]:
# Get drivers who won each race (position = 1)
df_winners = df_results.filter("position = 1").select("raceId", "driverId")


In [0]:
# Join with average pit stop data
df_winner_pitstop = df_winners.join(df_avg_pitstop, on=["raceId", "driverId"], how="left")


In [0]:
# Sort winners by average pit stop duration
df_winner_pitstop.orderBy("avg_pit_duration").show(10)


Q3

In [0]:
from pyspark.sql.functions import when, col, upper, substring

# Fill missing codes with first 3 letters of the surname
df_drivers_fixed = df_drivers.withColumn(
    "code",
    when(col("code").isNull() | (col("code") == ""), upper(substring(col("surname"), 1, 3)))
    .otherwise(col("code"))
)



In [0]:
# Show updated drivers with generated or existing codes
df_drivers_fixed.select("driverId", "forename", "surname", "code").show(10)


Q4

In [0]:
from pyspark.sql.functions import to_date, current_date, datediff

# Convert dob to date and calculate age (in years)
df_drivers_with_age = df_drivers_fixed.withColumn("dob", to_date("dob")) \
    .withColumn("Age", (datediff(current_date(), "dob") / 365).cast("int"))


In [0]:
# Join drivers with race results
df_race_driver_age = df_results.select("raceId", "driverId") \
    .join(df_drivers_with_age.select("driverId", "Age"), on="driverId", how="left")


In [0]:
from pyspark.sql.functions import min, max

# Group by race and get min & max age
df_race_age_extremes = df_race_driver_age.groupBy("raceId").agg(
    min("Age").alias("youngest_age"),
    max("Age").alias("oldest_age")
)

df_race_age_extremes.show(10)


Q5

In [0]:
# Count total wins per driver
df_win_count = df_results.filter("position = 1") \
    .groupBy("driverId").count().withColumnRenamed("count", "win_count")

# Count total non-finish (loss) per driver
df_loss_count = df_results.filter((col("position").isNull()) | (col("position") == "\\N")) \
    .groupBy("driverId").count().withColumnRenamed("count", "loss_count")


In [0]:
# Combine win and loss counts
df_win_loss = df_win_count.join(df_loss_count, on="driverId", how="outer").fillna(0)
df_win_loss.orderBy(col("win_count").desc(), col("loss_count").desc()).show(10)


Q6:Which driver has participated in the most races?

In [0]:
# Count number of races participated by each driver
df_race_count = df_results.groupBy("driverId").count().withColumnRenamed("count", "races_participated")

# Join with driver names
df_driver_participation = df_race_count.join(
    df_drivers_fixed.select("driverId", "forename", "surname"),
    on="driverId",
    how="left"
)

# Sort descending and show top 10
df_driver_participation.orderBy("races_participated", ascending=False).show(10)
