In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, trim, lower, when, coalesce, first, last,
    concat_ws, concat, udf, collect_list, array_join, explode,
    min as spark_min, max as spark_max, expr
)
from pyspark.sql.types import *
import pyspark.sql.functions as F

# ========== 1. Start spark ==========
spark = SparkSession.builder \
    .appName("F1 Pitstop Pipeline 2018-2024") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/21 14:58:54 WARN Utils: Your hostname, Divyanshs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.33.74.20 instead (on interface en0)
25/11/21 14:58:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/21 14:58:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
conda install -c conda-forge openjdk=17

[1;33mJupyter detected[0m[1;33m...[0m
[1;32m2[0m[1;32m channel Terms of Service accepted[0m
Channels:
 - conda-forge
 - defaults
Platform: osx-arm64
doneecting package metadata (repodata.json): - 
doneing environment: / 


    current version: 25.7.0
    latest version: 25.9.1

Please update conda by running

    $ conda update -n base -c conda-forge conda



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.


In [3]:
# ========== 2. Paths (update to your repo / S3 / HDFS) ==========
RAW_DIR = "./data/raw"           # relative to current directory
OUT_DIR = "./data/processed"     # final outputs here

# example files:
PIT1_PATH = f"{RAW_DIR}/pitstop_1st.csv"
PIT2_PATH = f"{RAW_DIR}/pitstop_2nd.csv"
SC_PATH   = f"{RAW_DIR}/safety_cars.csv"
RF_PATH   = f"{RAW_DIR}/red_flags.csv"

# optional lookups (you should download from Ergast/Kaggle or supply them)
RACES_PATH   = f"{RAW_DIR}/races.csv"    # maps raceId -> Season, Round, RaceName, Date
DRIVERS_PATH = f"{RAW_DIR}/drivers.csv"  # maps driverId -> driverName, abbreviation

Now Checking if the files are loaded correctly 

In [4]:
import os

print("Current working directory:", os.getcwd())
print("\nChecking file existence:")
files_to_check = {
    "Pitstops 1st": PIT1_PATH,
    "Pitstops 2nd": PIT2_PATH,
    "Safety Car": SC_PATH,
    "Red Flag": RF_PATH,
    "Races Lookup": RACES_PATH,
    "Drivers Lookup": DRIVERS_PATH
}

for name, path in files_to_check.items():
    exists = os.path.exists(path)
    print(f"{'yes' if exists else 'no'} {name}: {path}")
    
# Also check the raw directory contents
print(f"\nContents of {RAW_DIR}:")
try:
    if os.path.exists(RAW_DIR):
        for item in os.listdir(RAW_DIR):
            print(f"  - {item}")
    else:
        print(f"Directory {RAW_DIR} does not exist!")
except Exception as e:
    print(f"Error listing directory: {e}")

Current working directory: /Users/divyanshdoshi/Documents/GitHub/Cloud_Formula1_Data_Cleaning_Pipeline

Checking file existence:
yes Pitstops 1st: ./data/raw/pitstop_1st.csv
yes Pitstops 2nd: ./data/raw/pitstop_2nd.csv
yes Safety Car: ./data/raw/safety_cars.csv
yes Red Flag: ./data/raw/red_flags.csv
yes Races Lookup: ./data/raw/races.csv
yes Drivers Lookup: ./data/raw/drivers.csv

Contents of ./data/raw:
  - pitstop_1st.csv
  - safety_cars.csv
  - drivers.csv
  - red_flags.csv
  - races.csv
  - pitstop_2nd.csv


Now We can see that the dataset are correctly loaded now we can start cleaning and joining the dataset

Our main aim of this Pipeline is to get out clean pitstop there are many problems with the dataset like name issues and no data in someplace so we will tackle this using spark 
And for our final aim to use this data for predicting better pitstop strategy we all need to know when the pitstop was done under safety car which can change whole race dynamic and we have to change the whole race strategy

In [5]:

# Use inferSchema=False and define schema or let Spark infer; define options to handle encoding.
pit2 = spark.read.options(header=True, multiLine=True, escape='"').csv(PIT2_PATH)
pit1 = spark.read.options(header=True, multiLine=True, escape='"').csv(PIT1_PATH)
safety = spark.read.options(header=True).csv(SC_PATH)
redflag = spark.read.options(header=True).csv(RF_PATH)

# Optional lookups (if present)
races_df = None
drivers_df = None
try:
    races_df = spark.read.options(header=True).csv(RACES_PATH)
    drivers_df = spark.read.options(header=True).csv(DRIVERS_PATH)
except Exception:
    print("Lookups not provided yet; pipeline will attempt fuzzy joins or require you to add lookups.")


This code loads your raw data files (pit stops, safety car, red flag data, races, drivers) into Spark DataFrames so we can process them.



In [6]:
def basic_clean(df, col_names):
    # Trim whitespace and convert empty strings to null
    for c in col_names:
        if c in df.columns:
            df = df.withColumn(c, trim(col(c)))
            df = df.withColumn(c, when(col(c) == "", None).otherwise(col(c)))
    return df

pit2 = basic_clean(pit2, pit2.columns)
pit1 = basic_clean(pit1, pit1.columns)
safety = basic_clean(safety, safety.columns)
redflag = basic_clean(redflag, redflag.columns)

encoding_fixes = {
    "Kimi R√É∆í√Ç¬§ikk√É∆í√Ç¬∂nen": "Kimi Räikkönen",
    "RÃ¤ikkÃ¶nen": "Kimi Räikkönen"
}
fix_udf = F.udf(lambda s: encoding_fixes.get(s, s) if s is not None else None, StringType())
if "Driver" in pit2.columns:
    pit2 = pit2.withColumn("Driver", fix_udf(col("Driver")))



this code help us remove text gaps and messy names of the drivers

In [8]:
numeric_casts = {
    "Season": IntegerType(),
    "Round": IntegerType(),
    "Laps": IntegerType(),
    "Position": IntegerType(),
    "TotalPitStops": IntegerType(),
    "Stint": IntegerType(),
    "Stint Length": IntegerType(),  # if present; watch spaces
    "Stint Length": IntegerType()
}

# cast safe function
def safe_cast(df, colname, dtype):
    if colname in df.columns:
        try:
            return df.withColumn(colname, col(colname).cast(dtype))
        except Exception:
            return df
    return df

# cast common numeric columns in pit2
for c in ["Season", "Round", "Laps", "Position", "TotalPitStops", "Stint", "Stint Length", "StintLength", "Pit_Lap", "Stint Length"]:
    if c in pit2.columns:
        pit2 = pit2.withColumn(c, col(c).cast(IntegerType()))

# cast pit1 numeric columns
for c in ["raceId", "driverId", "stop", "lap", "milliseconds"]:
    if c in pit1.columns:
        pit1 = pit1.withColumn(c, col(c).cast(IntegerType()))
for c in ["duration"]:
    if c in pit1.columns:
        # duration may be text like '26.898'
        pit1 = pit1.withColumn(c, col(c).cast(DoubleType()))

this code helps us cover text into numeric form so that it can be used for proper analysis

In [9]:
def first_non_null(arr):
    if arr is None:
        return None
    for x in arr:
        if x is not None and str(x).strip() != "":
            return x
    return None


In [10]:
first_non_null_udf = F.udf(first_non_null, StringType())

group_cols = []
for c in ["Season","Round","Circuit","Driver","Constructor","Race Name","Date","Location","Country","Abbreviation"]:
    if c in pit2.columns:
        group_cols.append(c)


In [11]:
agg_exprs = []
# gather lists for candidate columns
candidates = ["Pit_Lap", "Pit_Time", "Tire Compound", "Stint Length", "Pit_Time", "Pit_Lap", "Pit_Time"]
for cand in set([c for c in candidates if c in pit2.columns]):
    agg_exprs.append(F.collect_list(cand).alias(f"{cand}_list"))

In [12]:
# also keep numeric environmental aggregates (Air_Temp_C etc) -> take first
for env in ["Air_Temp_C","Track_Temp_C","Humidity_%","Wind_Speed_KMH","AvgPitStopTime"]:
    if env in pit2.columns:
        agg_exprs.append(first(env).alias(env))


In [13]:
# build grouping
if len(group_cols) == 0:
    raise Exception("Expected at least one grouping column in pit2 (Season/Round/Driver). Check file headers.")

agg_df = pit2.groupBy(*group_cols).agg(*agg_exprs)


In [14]:
# Debug: Check what columns actually exist
print("Available columns in agg_df:")
for col_name in agg_df.columns:
    print(f"  - {col_name}")

# Only process columns that actually have the "_list" suffix
existing_list_columns = [col_name for col_name in agg_df.columns if col_name.endswith('_list')]
print(f"\nFound list columns: {existing_list_columns}")

# Process only the columns that actually exist
for list_col in existing_list_columns:
    base_col = list_col.replace('_list', '')  # Remove the _list suffix
    agg_df = agg_df.withColumn(base_col, first_non_null_udf(col(list_col)))
    agg_df = agg_df.drop(list_col)

Available columns in agg_df:
  - Season
  - Round
  - Circuit
  - Driver
  - Constructor
  - Race Name
  - Date
  - Location
  - Country
  - Abbreviation
  - Stint Length_list
  - Pit_Lap_list
  - Pit_Time_list
  - Tire Compound_list
  - Air_Temp_C
  - Track_Temp_C
  - Humidity_%
  - Wind_Speed_KMH
  - AvgPitStopTime

Found list columns: ['Stint Length_list', 'Pit_Lap_list', 'Pit_Time_list', 'Tire Compound_list']


In [15]:
from pyspark.sql.functions import collect_list, first

# Group by and aggregate without creating list columns
agg_df = pit2.groupBy("Season", "Round", "Circuit", "Driver", "Constructor").agg(
    first("Race Name").alias("Race Name"),
    first("Date").alias("Date"),
    first("Location").alias("Location"),
    first("Country").alias("Country"),
    first("Abbreviation").alias("Abbreviation"),
    first("Air_Temp_C").alias("Air_Temp_C"),
    first("Track_Temp_C").alias("Track_Temp_C"),
    first("Humidity_%").alias("Humidity_%"),
    first("Wind_Speed_KMH").alias("Wind_Speed_KMH"),
    first("AvgPitStopTime").alias("AvgPitStopTime"),
    first("Pit_Lap").alias("Pit_Lap"),
    first("Pit_Time").alias("Pit_Time"),
    first("Tire Compound").alias("Tire Compound"),
    first("Stint Length").alias("Stint Length")
)

agg_df.show()

25/11/21 15:13:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 6:>                                                          (0 + 1) / 1]

+------+-----+--------------------+--------------------+------------+--------------------+----------+---------+---------+------------+-----------------+-----------------+----------+--------------+--------------+-------+--------+-------------+------------+
|Season|Round|             Circuit|              Driver| Constructor|           Race Name|      Date| Location|  Country|Abbreviation|       Air_Temp_C|     Track_Temp_C|Humidity_%|Wind_Speed_KMH|AvgPitStopTime|Pit_Lap|Pit_Time|Tire Compound|Stint Length|
+------+-----+--------------------+--------------------+------------+--------------------+----------+---------+---------+------------+-----------------+-----------------+----------+--------------+--------------+-------+--------+-------------+------------+
|  2018|    1|Albert Park Grand...|     Brendon Hartley|  Toro Rosso|Australian Grand ...|25-03-2018|Melbourne|Australia|         HAR|15.78333333333333|22.28333333333333|        57|          23.8|       22.2545|     22|  22.296|    

                                                                                

In [16]:
agg_df.write.mode("overwrite").option("header", "true").csv(f"{OUT_DIR}/pitstop_aggregated")

In [17]:
# Check safety car DataFrame structure
print("Safety car DataFrame columns:")
safety.printSchema()
safety.show(5)

# Check what join keys are available in both DataFrames
print("Columns in agg_df:", agg_df.columns)
print("Columns in safety:", safety.columns)

# Find common columns for joining
common_columns = set(agg_df.columns) & set(safety.columns)
print("Common columns for joining:", common_columns)


Safety car DataFrame columns:
root
 |-- Race: string (nullable = true)
 |-- Cause: string (nullable = true)
 |-- Deployed: string (nullable = true)
 |-- Retreated: string (nullable = true)
 |-- FullLaps: string (nullable = true)

+--------------------+-------------+--------+---------+--------+
|                Race|        Cause|Deployed|Retreated|FullLaps|
+--------------------+-------------+--------+---------+--------+
|1973 Canadian Gra...|     Accident|      33|       39|       5|
|1993 Brazilian Gr...|Accident/Rain|      29|       38|       8|
|1993 British Gran...| Stranded car|      38|       40|       1|
|1994 San Marino G...|     Accident|       1|        6|       4|
|1995 Belgian Gran...|         Rain|      28|       33|       4|
+--------------------+-------------+--------+---------+--------+
only showing top 5 rows
Columns in agg_df: ['Season', 'Round', 'Circuit', 'Driver', 'Constructor', 'Race Name', 'Date', 'Location', 'Country', 'Abbreviation', 'Air_Temp_C', 'Track_Temp_

In [18]:
# Examine the Race column pattern in safety data
print("Sample Race names from safety data:")
safety.select("Race").distinct().show(10, truncate=False)

# If Race names follow a pattern like "2023 Monaco Grand Prix", we can extract Season
from pyspark.sql.functions import split, col, regexp_extract

# Try to extract season year from Race name
safety_with_season = safety.withColumn(
    "Season", 
    regexp_extract(col("Race"), r"(\d{4})", 1).cast("int")
)

# Try to extract circuit name (everything after the year)
safety_with_circuit = safety_with_season.withColumn(
    "Circuit",
    regexp_extract(col("Race"), r"\d{4} (.*)", 1)
)

print("Safety data with extracted columns:")
safety_with_circuit.show(5, truncate=False)

# Now join with the extracted columns
final_df = agg_df.join(
    safety_with_circuit, 
    ["Season", "Circuit"], 
    "left"
)

print("Joined DataFrame sample:")
final_df.select("Season", "Round", "Circuit", "Driver", "Cause", "Deployed").show(10)

Sample Race names from safety data:
+-----------------------------+
|Race                         |
+-----------------------------+
|2011 Belgian Grand Prix      |
|1973 Canadian Grand Prix     |
|2013 United States Grand Prix|
|2016 British Grand Prix      |
|2021 S√£o Paulo Grand Prix   |
|2010 Belgian Grand Prix      |
|2005 Brazilian Grand Prix    |
|2020 Sakhir Grand Prix       |
|2008 German Grand Prix       |
|2009 Brazilian Grand Prix    |
+-----------------------------+
only showing top 10 rows
Safety data with extracted columns:
+--------------------------+-------------+--------+---------+--------+------+---------------------+
|Race                      |Cause        |Deployed|Retreated|FullLaps|Season|Circuit              |
+--------------------------+-------------+--------+---------+--------+------+---------------------+
|1973 Canadian Grand Prix  |Accident     |33      |39       |5       |1973  |Canadian Grand Prix  |
|1993 Brazilian Grand Prix |Accident/Rain|29      |38  

In [19]:
# Check how many races had safety car deployments
safety_count = safety_with_circuit.count()
print(f"Total safety car events in dataset: {safety_count}")

# Check which races in agg_df matched with safety car data
matches = final_df.filter(col("Cause").isNotNull()).count()
print(f"Races with safety car deployments: {matches}")

# Show some races that actually had safety cars
print("Races with safety car deployments:")
final_df.filter(col("Cause").isNotNull()).select(
    "Season", "Round", "Circuit", "Driver", "Cause", "Deployed", "Retreated"
).show(10, truncate=False)

# Continue with your analysis - you now have a complete dataset
# Save the final joined data
final_df.write.mode("overwrite").option("header", "true").csv(f"{OUT_DIR}/final_joined_data")

print("Data processing completed successfully!")

Total safety car events in dataset: 362
Races with safety car deployments: 0
Races with safety car deployments:
+------+-----+-------+------+-----+--------+---------+
|Season|Round|Circuit|Driver|Cause|Deployed|Retreated|
+------+-----+-------+------+-----+--------+---------+
+------+-----+-------+------+-----+--------+---------+

Data processing completed successfully!


In [20]:
# Check season ranges in both datasets
print("Season range in pitstop data:")
agg_df.select(
    F.min("Season").alias("min_season"), 
    F.max("Season").alias("max_season")
).show()

print("Season range in safety car data:")
safety_with_circuit.select(
    F.min("Season").alias("min_season"), 
    F.max("Season").alias("max_season")
).show()

# Check if there are any overlapping seasons
pitstop_seasons = agg_df.select("Season").distinct().rdd.flatMap(lambda x: x).collect()
safety_seasons = safety_with_circuit.select("Season").distinct().rdd.flatMap(lambda x: x).collect()

print(f"Pitstop seasons: {sorted(pitstop_seasons)}")
print(f"Safety car seasons: {sorted(safety_seasons)}")
print(f"Overlapping seasons: {set(pitstop_seasons) & set(safety_seasons)}")

Season range in pitstop data:
+----------+----------+
|min_season|max_season|
+----------+----------+
|      2018|      2024|
+----------+----------+

Season range in safety car data:
+----------+----------+
|min_season|max_season|
+----------+----------+
|      1973|      2025|
+----------+----------+

Pitstop seasons: [2018, 2019, 2020, 2021, 2022, 2023, 2024]
Safety car seasons: [1973, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025]
Overlapping seasons: {2018, 2019, 2020, 2021, 2022, 2023, 2024}


In [21]:
from pyspark.sql import functions as F

# Check circuit name differences between datasets
print("Sample circuit names from pitstop data:")
agg_df.select("Circuit").distinct().show(10, truncate=False)

print("Sample circuit names from safety car data:")
safety_with_circuit.select("Circuit").distinct().show(10, truncate=False)

# Check for exact matches on overlapping seasons
overlap_df = agg_df.join(
    safety_with_circuit.filter(F.col("Season").between(2018, 2024)),
    ["Season", "Circuit"], 
    "inner"
)

print(f"Exact matches found: {overlap_df.count()}")

# Let's try a fuzzy match by checking partial circuit name matches
from pyspark.sql.functions import lower

# Normalize circuit names and check for partial matches
pitstop_circuits = agg_df.select(
    F.lower(F.col("Circuit")).alias("circuit_norm")
).distinct().rdd.flatMap(lambda x: x).collect()

safety_circuits = safety_with_circuit.filter(F.col("Season").between(2018, 2024)).select(
    F.lower(F.col("Circuit")).alias("circuit_norm")
).distinct().rdd.flatMap(lambda x: x).collect()

print(f"Pitstop circuits (2018-2024): {pitstop_circuits}")
print(f"Safety car circuits (2018-2024): {safety_circuits}")


Sample circuit names from pitstop data:
+------------------------------------------+
|Circuit                                   |
+------------------------------------------+
|Istanbul Park                             |
|Aut√É∆í√Ç¬≥dromo Internacional do Algarve |
|Albert Park Grand Prix Circuit            |
|Circuit Gilles Villeneuve                 |
|Suzuka Circuit                            |
|Aut√É∆í√Ç¬≥dromo Hermanos Rodr√É∆í√Ç¬≠guez|
|Miami International Autodrome             |
|Autodromo Nazionale di Monza              |
|Baku City Circuit                         |
|Silverstone Circuit                       |
+------------------------------------------+
only showing top 10 rows
Sample circuit names from safety car data:
+---------------------+
|Circuit              |
+---------------------+
|Sakhir Grand Prix    |
|S√£o Paulo Grand Prix|
|German Grand Prix    |
|Spanish Grand Prix   |
|Turkish Grand Prix   |
|Miami Grand Prix     |
|Brazilian Grand Prix |
|Argentine Grand Prix 

In [24]:
# Create a circuit to race name mapping
circuit_mapping = {
    "Suzuka Circuit": "Japanese Grand Prix",
    "Istanbul Park": "Turkish Grand Prix", 
    "Autodromo Nazionale di Monza": "Italian Grand Prix",
    "Circuit de Barcelona-Catalunya": "Spanish Grand Prix",
    "Circuit de Spa-Francorchamps": "Belgian Grand Prix",
    "Circuit de Monaco": "Monaco Grand Prix",
    "Red Bull Ring": "Austrian Grand Prix",
    "Yas Marina Circuit": "Abu Dhabi Grand Prix",
    "Bahrain International Circuit": "Bahrain Grand Prix",
    "Circuit Park Zandvoort": "Dutch Grand Prix",
    "Circuit Paul Ricard": "French Grand Prix",
    "Hungaroring": "Hungarian Grand Prix",
    "Silverstone Circuit": "British Grand Prix",
    "Circuit of the Americas": "United States Grand Prix",
    "Hockenheimring": "German Grand Prix",
    "Jeddah Corniche Circuit": "Saudi Arabian Grand Prix",
    "Marina Bay Street Circuit": "Singapore Grand Prix",
    "Circuit Gilles Villeneuve": "Canadian Grand Prix",
    "Miami International Autodrome": "Miami Grand Prix",
    "Autódromo José Carlos Pace": "São Paulo Grand Prix",
    "Albert Park Grand Prix Circuit": "Australian Grand Prix",
    "Baku City Circuit": "Azerbaijan Grand Prix",
    "Losail International Circuit": "Qatar Grand Prix",
    "Sochi Autodrom": "Russian Grand Prix"
}

mapping_df = spark.createDataFrame([(k, v) for k, v in circuit_mapping.items()], ["Circuit", "Race_Name"])

# Join pitstop data with mapping
agg_df_with_race = agg_df.join(mapping_df, "Circuit", "left")

# Now continue with your join
final_df = agg_df_with_race.join(
    safety_with_race, 
    ["Season", "Race_Name"], 
    "left"
)

# Rename the Circuit column in safety data to match our mapping
safety_with_race = safety_with_circuit.withColumnRenamed("Circuit", "Race_Name")

# Now join with safety data using the mapped race names
final_df = agg_df_with_race.join(
    safety_with_race, 
    ["Season", "Race_Name"], 
    "left"
)

print(f"Matches found: {final_df.filter(F.col('Cause').isNotNull()).count()}")
final_df.filter(F.col('Cause').isNotNull()).select("Season", "Circuit", "Race_Name", "Cause").show(10, truncate=False)

Matches found: 1940
+------+--------------+-------------------+--------------------+
|Season|Circuit       |Race_Name          |Cause               |
+------+--------------+-------------------+--------------------+
|2018  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
|2018  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
|2022  |Suzuka Circuit|Japanese Grand Prix|Stranded car/Rain   |
|2018  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
|2018  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
|2023  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
|2022  |Suzuka Circuit|Japanese Grand Prix|Stranded car/Rain   |
|2022  |Suzuka Circuit|Japanese Grand Prix|Stranded car/Rain   |
|2018  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
|2018  |Suzuka Circuit|Japanese Grand Prix|Debris from accident|
+------+--------------+-------------------+--------------------+
only showing top 10 rows


In [25]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, concat_ws

# Apply to final_df instead of agg_df
if "Pit_Lap" in final_df.columns:
    final_df = final_df.withColumn("Pit_Lap", col("Pit_Lap").cast(IntegerType()))

# Create canonical ID (note: no Stint column, using Pit_Lap instead)
final_df = final_df.withColumn(
    "pit2_id",
    concat_ws("_", 
        col("Season").cast(StringType()), 
        col("Round").cast(StringType()), 
        col("Abbreviation"), 
        col("Pit_Lap").cast(StringType())  # Using Pit_Lap instead of Stint
    )
)

final_df.select("pit2_id", "Season", "Round", "Abbreviation", "Pit_Lap").show(5)

+-------------+------+-----+------------+-------+
|      pit2_id|Season|Round|Abbreviation|Pit_Lap|
+-------------+------+-----+------------+-------+
|2018_1_HAR_22|  2018|    1|         HAR|     22|
|2018_1_SAI_22|  2018|    1|         SAI|     22|
|2018_1_LEC_20|  2018|    1|         LEC|     20|
|2018_1_RIC_26|  2018|    1|         RIC|     26|
|2018_1_OCO_23|  2018|    1|         OCO|     23|
+-------------+------+-----+------------+-------+
only showing top 5 rows


In [26]:
# Save your current final dataset
final_df.write.mode("overwrite").option("header", "true").csv(f"{OUT_DIR}/final_pitstop_analysis_data.csv")

print("SUCCESS: Final dataset saved!")
print(f"Dataset contains {final_df.count()} rows")
print(f"Safety car events matched: {final_df.filter(F.col('Cause').isNotNull()).count()}")

# Show final schema
final_df.printSchema()

spark.stop()

SUCCESS: Final dataset saved!
Dataset contains 3381 rows
Safety car events matched: 1940
root
 |-- Season: integer (nullable = true)
 |-- Race_Name: string (nullable = true)
 |-- Circuit: string (nullable = true)
 |-- Round: integer (nullable = true)
 |-- Driver: string (nullable = true)
 |-- Constructor: string (nullable = true)
 |-- Race Name: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Abbreviation: string (nullable = true)
 |-- Air_Temp_C: string (nullable = true)
 |-- Track_Temp_C: string (nullable = true)
 |-- Humidity_%: string (nullable = true)
 |-- Wind_Speed_KMH: string (nullable = true)
 |-- AvgPitStopTime: string (nullable = true)
 |-- Pit_Lap: integer (nullable = true)
 |-- Pit_Time: string (nullable = true)
 |-- Tire Compound: string (nullable = true)
 |-- Stint Length: integer (nullable = true)
 |-- Race: string (nullable = true)
 |-- Cause: string (nullable = true)
 |-- 

Now our data cleaing and processing is done we will get all the dataset with our requirment satisfied 