In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, floor, split, col, when, sum
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead, when, avg
import re
import os
import shutil
from concurrent.futures import ThreadPoolExecutor

data transformation/cleaning == done
data splitting: in progress
data loading to AWS S3 or Azure data lake: Next

In [2]:
spark = SparkSession.builder \
    .appName("TelemetryProcessing") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

25/04/27 21:50:43 WARN Utils: Your hostname, gesser-ASUS-TUF-Gaming-F15-FX506HM-FX506HM resolves to a loopback address: 127.0.1.1; using 192.168.1.56 instead (on interface wlo1)
25/04/27 21:50:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/27 21:50:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# experimenting with data transformation

15525718781239906836 : Hanoi

In [64]:
df = spark.read.csv("f1-2020-race-data/TelemetryData_15525718781239906836.csv", header=True, inferSchema=True)

df.show(n=5)

                                                                                

+-----------+---------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+----------------+----------------+--------------+--------------+--------------+-------------+------------------+--------------+-------+-------+-------+-----+--------+-----+-----+------+----+---------+---+-----------------+-----------------------+---------------------+-----------------+--------------------+-----------+-------+----------------+----------+-----------------+--------------------+------------------+--------------------+--------------+-------------+-----------------------+-----------------------+------------------+-----------+--------------+-------------+-----------+-------------+---------+------+------------+------------+
|sessionTime|frameIdentifier|pilot_index|worldPositionX|worldPositionY|worldPositionZ|worldVelocityX|worldVelocityY|worldVelocityZ|worldForwardDirX|worldForwardDirY|worldForwardDirZ|worldRightDirX|worldRightDirY

In [73]:
df.select("actualTyreCompound").distinct().collect()

                                                                                

[Row(actualTyreCompound=1),
 Row(actualTyreCompound=2),
 Row(actualTyreCompound=0)]

In [33]:
df.filter(df["speed"].isNull()).show(n=100)



+-----------+---------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+----------------+----------------+--------------+--------------+--------------+-------------+------------------+--------------+--------+--------+--------+-----+--------+-----+-----+------+----+---------+----+-----------------+-------+----------------+----------+-----------------+------------------+--------------+-------------+-----------------------+-----------------------+------------------+-----------+--------------+-------------+-----------+-------------+---------+------+------------+------------+--------------------------+--------------------------+--------------------------+--------------------------+------------------------+------------------------+------------------------+------------------------+----------------+----------------+----------------+----------------+------------+------------+------------+------------+--------------+----

                                                                                

In [16]:
# Calculating NULL values per column

df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()




+-----------+---------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+----------------+----------------+--------------+--------------+--------------+-------------+------------------+--------------+---+-----+----+-----+--------+-----+-----+------+----+---------+---+-----------------+-------+----------------+----------+-----------------+------------------+--------------+-------------+-----------------------+-----------------------+------------------+-----------+--------------+-------------+-----------+-------------+---------+------+------------+------------+--------------------------+--------------------------+--------------------------+--------------------------+------------------------+------------------------+------------------------+------------------------+----------------+----------------+----------------+----------------+------------+------------+------------+------------+--------------+--------------+--

                                                                                

In [38]:
df.printSchema()

root
 |-- sessionTime: double (nullable = true)
 |-- frameIdentifier: integer (nullable = true)
 |-- pilot_index: integer (nullable = true)
 |-- worldPositionX: double (nullable = true)
 |-- worldPositionY: double (nullable = true)
 |-- worldPositionZ: double (nullable = true)
 |-- worldVelocityX: double (nullable = true)
 |-- worldVelocityY: double (nullable = true)
 |-- worldVelocityZ: double (nullable = true)
 |-- worldForwardDirX: integer (nullable = true)
 |-- worldForwardDirY: integer (nullable = true)
 |-- worldForwardDirZ: integer (nullable = true)
 |-- worldRightDirX: integer (nullable = true)
 |-- worldRightDirY: integer (nullable = true)
 |-- worldRightDirZ: integer (nullable = true)
 |-- gForceLateral: double (nullable = true)
 |-- gForceLongitudinal: double (nullable = true)
 |-- gForceVertical: double (nullable = true)
 |-- yaw: double (nullable = true)
 |-- pitch: double (nullable = true)
 |-- roll: double (nullable = true)
 |-- speed: double (nullable = true)
 |-- throt

In [65]:
# Making the pit status and resultstatus columns binary instead of String + imputating the Null variables

df = df.withColumn("inPitArea", when(col("pitStatus").isNull(), False).otherwise(True))
df = df.withColumn("pitting", when(col("pitStatus")=="pitting", True).otherwise(False))
df = df.withColumn("active", when(col("resultStatus").isNull(), False).otherwise(True))

# mapping tyre compounds to integers instead of string

df = df.withColumn("actualTyreCompound", when(col("actualTyreCompound")=="soft", 0) \
       .when(col("actualTyreCompound")=="medium", 1) \
       .when(col("actualTyreCompound")=="hard", 2))

#Splitting tyre and break columns with this format: FL/FR/RL/RR to 4 columns for each tyre
split_bt = split(col("brakesTemperature"), "/")
split_tst = split(col("tyresSurfaceTemperature"), "/")
split_tit = split(col("tyresInnerTemperature"), "/")
split_tp = split(col("tyresPressure"), "/")
split_st = split(col("surfaceType"), "/")
split_tw = split(col("tyresWear"), "/")
split_td = split(col("tyresDamage"), "/")

# Create new columns for each tyre
df = df.withColumn("FL_tyresSurfaceTemperature", split_tst.getItem(0).cast("double")) \
       .withColumn("FR_tyresSurfaceTemperature", split_tst.getItem(1).cast("double")) \
       .withColumn("RL_tyresSurfaceTemperature", split_tst.getItem(2).cast("double")) \
       .withColumn("RR_tyresSurfaceTemperature", split_tst.getItem(3).cast("double")) \
       .withColumn("FL_tyresInnerTemperature", split_tit.getItem(0).cast("double")) \
       .withColumn("FR_tyresInnerTemperature", split_tit.getItem(1).cast("double")) \
       .withColumn("RL_tyresInnerTemperature", split_tit.getItem(2).cast("double")) \
       .withColumn("RR_tyresInnerTemperature", split_tit.getItem(3).cast("double")) \
       .withColumn("FL_tyresPressure", split_tp.getItem(0).cast("double")) \
       .withColumn("FR_tyresPressure", split_tp.getItem(1).cast("double")) \
       .withColumn("RL_tyresPressure", split_tp.getItem(2).cast("double")) \
       .withColumn("RR_tyresPressure", split_tp.getItem(3).cast("double")) \
       .withColumn("FL_tyresWear", split_tw.getItem(0).cast("double")) \
       .withColumn("FR_tyresWear", split_tw.getItem(1).cast("double")) \
       .withColumn("RL_tyresWear", split_tw.getItem(2).cast("double")) \
       .withColumn("RR_tyresWear", split_tw.getItem(3).cast("double")) \
       .withColumn("FL_tyresDamage", split_td.getItem(0).cast("double")) \
       .withColumn("FR_tyresDamage", split_td.getItem(1).cast("double")) \
       .withColumn("RL_tyresDamage", split_td.getItem(2).cast("double")) \
       .withColumn("RR_tyresDamage", split_td.getItem(3).cast("double")) \
       .withColumn("FL_brakesTemperature", split_bt.getItem(0).cast("double")) \
       .withColumn("FR_brakesTemperature", split_bt.getItem(1).cast("double")) \
       .withColumn("RL_brakesTemperature", split_bt.getItem(2).cast("double")) \
       .withColumn("RR_brakesTemperature", split_bt.getItem(3).cast("double")) \
       .withColumn("FL_surfaceType", split_st.getItem(0).cast("int")) \
       .withColumn("FR_surfaceType", split_st.getItem(1).cast("int")) \
       .withColumn("RL_surfaceType", split_st.getItem(2).cast("int")) \
       .withColumn("RR_surfaceType", split_st.getItem(3).cast("int")) \

df = df.drop("resultStatus", "pitStatus", "brakesTemperature", "tyresSurfaceTemperature", "tyresInnerTemperature", "tyresPressure", "surfaceType", "tyresWear", "tyresDamage")


df.show(n=5)

+-----------+---------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+----------------+----------------+--------------+--------------+--------------+-------------+------------------+--------------+-------+-------+-------+-----+--------+-----+-----+------+----+---------+---+-----------------+-------+----------------+----------+-----------------+------------------+--------------+-------------+-----------------------+-----------------------+------------------+-----------+--------------+-------------+-----------+-------------+------+------------+---------+-------+------+--------------------------+--------------------------+--------------------------+--------------------------+------------------------+------------------------+------------------------+------------------------+----------------+----------------+----------------+----------------+------------+------------+------------+------------+--------------+------

In [67]:
#ordering the dataset by pilot and by timestamps

df = df.orderBy(["pilot_index", "frameIdentifier"])

In [68]:
#defining a function to calculate the average between values around a null value

def interpolate(df, *columns):
    window_spec = Window.partitionBy("pilot_index").orderBy("frameIdentifier")
    
    for column in columns:
        df = df.withColumn("prev_value", lag(column).over(window_spec))
        df = df.withColumn("next_value", lead(column).over(window_spec))

        df = df.withColumn(column, 
                        when(col(column).isNull(), (col("prev_value") + col("next_value")) / 2)
                        .otherwise(col(column))
                        )
        df = df.drop("prev_value", "next_value")
    return df

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
import sys

# forwardpass funtion
def forwardpass(df, *columns):
    windowSpec = Window.partitionBy("pilot_index").orderBy("frameIdentifier").rowsBetween(-sys.maxsize, 0)

    for column in columns:
        df = df.withColumn(
            column,  # Name the output column based on the input
            last(col(column), ignorenulls=True).over(windowSpec)
        )

    return df

# forwardpass funtion
def backwardpass(df, *columns):
    windowSpec = Window.partitionBy("pilot_index").orderBy("frameIdentifier").rowsBetween(0, sys.maxsize)

    for column in columns:
        df = d# Making the pit status and resultstatus columns binary instead of String + imputating the Null variables

df = df.withColumn("inPitArea", when(col("pitStatus").isNull(), False).otherwise(True))
df = df.withColumn("pitting", when(col("pitStatus")=="pitting", True).otherwise(False))
df = df.withColumn("active", when(col("resultStatus").isNull(), False).otherwise(True))

# mapping tyre compounds to integers instead of string

df = df.withColumn("actualTyreCompound", when(col("actualTyreCompound")=="soft", 0) \
       .when(col("actualTyreCompound")=="medium", 1) \
       .when(col("actualTyreCompound")=="hard", 2))

#Splitting tyre and break columns with this format: FL/FR/RL/RR to 4 columns for each tyre
split_bt = split(col("brakesTemperature"), "/")
split_tst = split(col("tyresSurfaceTemperature"), "/")
split_tit = split(col("tyresInnerTemperature"), "/")
split_tp = split(col("tyresPressure"), "/")
split_st = split(col("surfaceType"), "/")
split_tw = split(col("tyresWear"), "/")
split_td = split(col("tyresDamage"), "/")

# Create new columns for each tyre
df = df.withColumn("FL_tyresSurfaceTemperature", split_tst.getItem(0).cast("double")) \
       .withColumn("FR_tyresSurfaceTemperature", split_tst.getItem(1).cast("double")) \
       .withColumn("RL_tyresSurfaceTemperature", split_tst.getItem(2).cast("double")) \
       .withColumn("RR_tyresSurfaceTemperature", split_tst.getItem(3).cast("double")) \
       .withColumn("FL_tyresInnerTemperature", split_tit.getItem(0).cast("double")) \
       .withColumn("FR_tyresInnerTemperature", split_tit.getItem(1).cast("double")) \
       .withColumn("RL_tyresInnerTemperature", split_tit.getItem(2).cast("double")) \
       .withColumn("RR_tyresInnerTemperature", split_tit.getItem(3).cast("double")) \
       .withColumn("FL_tyresPressure", split_tp.getItem(0).cast("double")) \
       .withColumn("FR_tyresPressure", split_tp.getItem(1).cast("double")) \
       .withColumn("RL_tyresPressure", split_tp.getItem(2).cast("double")) \
       .withColumn("RR_tyresPressure", split_tp.getItem(3).cast("double")) \
       .withColumn("FL_tyresWear", split_tw.getItem(0).cast("double")) \
       .withColumn("FR_tyresWear", split_tw.getItem(1).cast("double")) \
       .withColumn("RL_tyresWear", split_tw.getItem(2).cast("double")) \
       .withColumn("RR_tyresWear", split_tw.getItem(3).cast("double")) \
       .withColumn("FL_tyresDamage", split_td.getItem(0).cast("double")) \
       .withColumn("FR_tyresDamage", split_td.getItem(1).cast("double")) \
       .withColumn("RL_tyresDamage", split_td.getItem(2).cast("double")) \
       .withColumn("RR_tyresDamage", split_td.getItem(3).cast("double")) \
       .withColumn("FL_brakesTemperature", split_bt.getItem(0).cast("double")) \
       .withColumn("FR_brakesTemperature", split_bt.getItem(1).cast("double")) \
       .withColumn("RL_brakesTemperature", split_bt.getItem(2).cast("double")) \
       .withColumn("RR_brakesTemperature", split_bt.getItem(3).cast("double")) \
       .withColumn("FL_surfaceType", split_st.getItem(0).cast("int")) \
       .withColumn("FR_surfaceType", split_st.getItem(1).cast("int")) \
       .withColumn("RL_surfaceType", split_st.getItem(2).cast("int")) \
       .withColumn("RR_surfaceType", split_st.getItem(3).cast("int")) \

df = df.drop("resultStatus", "pitStatus", "brakesTemperature", "tyresSurfaceTemperature", "tyresInnerTemperature", "tyresPressure", "surfaceType", "tyresWear", "tyresDamage")
f.withColumn(
            column,  # Name the output column based on the input
            last(col(column), ignorenulls=True).over(windowSpec)
        )

    return df

In [70]:
#cetegorical data

df = forwardpass(df, "FL_surfaceType", "FR_surfaceType", "RL_surfaceType", "RR_surfaceType", "pitLimiterStatus", "actualTyreCompound", "drs", "gear", "ersDeployMode", "fuelMix")


In [71]:
# numerical data

# fixing the function + completing all numerical data

df = interpolate(df, "speed", "throttle", "steer", "brake", "clutch", "engineRPM", "engineTemperature", "fuelInTank", "fuelRemainingLaps", "ersStoreEnergy", "ersHarvestedThisLapMGUK", "ersHarvestedThisLapMGUH", "ersDeployedThisLap", "FL_tyresSurfaceTemperature","FR_tyresSurfaceTemperature","RL_tyresSurfaceTemperature","RR_tyresSurfaceTemperature","FL_tyresInnerTemperature","FR_tyresInnerTemperature","RL_tyresInnerTemperature","RR_tyresInnerTemperature","FL_tyresPressure","FR_tyresPressure","RL_tyresPressure","RR_tyresPressure","FL_tyresWear","FR_tyresWear","RL_tyresWear","RR_tyresWear","FL_tyresDamage","FR_tyresDamage","RL_tyresDamage","RR_tyresDamage","FL_brakesTemperature","FR_brakesTemperature","RL_brakesTemperature","RR_brakesTemperature")

In [72]:
# Calculating NULL values per column

df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()



+-----------+---------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+----------------+----------------+--------------+--------------+--------------+-------------+------------------+--------------+---+-----+----+-----+--------+-----+-----+------+----+---------+---+-----------------+-------+----------------+----------+-----------------+------------------+--------------+-------------+-----------------------+-----------------------+------------------+-----------+--------------+-------------+-----------+-------------+------+------------+---------+-------+------+--------------------------+--------------------------+--------------------------+--------------------------+------------------------+------------------------+------------------------+------------------------+----------------+----------------+----------------+----------------+------------+------------+------------+------------+--------------+--------------+

                                                                                

In [16]:
df.select("resultStatus").distinct().collect()

[Row(resultStatus='active'), Row(resultStatus=None)]

In [17]:
97150*100/df.count()

8.597040812714594

In [None]:
'''# Add row numbers within each pilot group
df = df.withColumn("row_num", row_number().over(window_spec))

# Create chunk IDs (200 frames per chunk)
df = df.withColumn("chunk_id", floor((col("row_num") - 1) / 200))

# Save as partitioned Parquet files
df.write.partitionBy("pilot_index", "chunk_id").parquet("output_chunks")'''


In [None]:
'''df.show(5)'''

In [None]:
def cleanDataSet(df):
    # Making the pit status and resultstatus columns binary instead of String + imputating the Null variables

    df = df.withColumn("inPitArea", when(col("pitStatus").isNull(), False).otherwise(True))
    df = df.withColumn("pitting", when(col("pitStatus")=="pitting", True).otherwise(False))
    df = df.withColumn("active", when(col("resultStatus").isNull(), False).otherwise(True))

    # mapping tyre compounds to integers instead of string

    df = df.withColumn("actualTyreCompound", when(col("actualTyreCompound")=="soft", 0) \
        .when(col("actualTyreCompound")=="medium", 1) \
        .when(col("actualTyreCompound")=="hard", 2))

    #Splitting tyre and break columns with this format: FL/FR/RL/RR to 4 columns for each tyre
    split_bt = split(col("brakesTemperature"), "/")
    split_tst = split(col("tyresSurfaceTemperature"), "/")
    split_tit = split(col("tyresInnerTemperature"), "/")
    split_tp = split(col("tyresPressure"), "/")
    split_st = split(col("surfaceType"), "/")
    split_tw = split(col("tyresWear"), "/")
    split_td = split(col("tyresDamage"), "/")

    # Create new columns for each tyre
    df = df.withColumn("FL_tyresSurfaceTemperature", split_tst.getItem(0).cast("double")) \
        .withColumn("FR_tyresSurfaceTemperature", split_tst.getItem(1).cast("double")) \
        .withColumn("RL_tyresSurfaceTemperature", split_tst.getItem(2).cast("double")) \
        .withColumn("RR_tyresSurfaceTemperature", split_tst.getItem(3).cast("double")) \
        .withColumn("FL_tyresInnerTemperature", split_tit.getItem(0).cast("double")) \
        .withColumn("FR_tyresInnerTemperature", split_tit.getItem(1).cast("double")) \
        .withColumn("RL_tyresInnerTemperature", split_tit.getItem(2).cast("double")) \
        .withColumn("RR_tyresInnerTemperature", split_tit.getItem(3).cast("double")) \
        .withColumn("FL_tyresPressure", split_tp.getItem(0).cast("double")) \
        .withColumn("FR_tyresPressure", split_tp.getItem(1).cast("double")) \
        .withColumn("RL_tyresPressure", split_tp.getItem(2).cast("double")) \
        .withColumn("RR_tyresPressure", split_tp.getItem(3).cast("double")) \
        .withColumn("FL_tyresWear", split_tw.getItem(0).cast("double")) \
        .withColumn("FR_tyresWear", split_tw.getItem(1).cast("double")) \
        .withColumn("RL_tyresWear", split_tw.getItem(2).cast("double")) \
        .withColumn("RR_tyresWear", split_tw.getItem(3).cast("double")) \
        .withColumn("FL_tyresDamage", split_td.getItem(0).cast("double")) \
        .withColumn("FR_tyresDamage", split_td.getItem(1).cast("double")) \
        .withColumn("RL_tyresDamage", split_td.getItem(2).cast("double")) \
        .withColumn("RR_tyresDamage", split_td.getItem(3).cast("double")) \
        .withColumn("FL_brakesTemperature", split_bt.getItem(0).cast("double")) \
        .withColumn("FR_brakesTemperature", split_bt.getItem(1).cast("double")) \
        .withColumn("RL_brakesTemperature", split_bt.getItem(2).cast("double")) \
        .withColumn("RR_brakesTemperature", split_bt.getItem(3).cast("double")) \
        .withColumn("FL_surfaceType", split_st.getItem(0).cast("int")) \
        .withColumn("FR_surfaceType", split_st.getItem(1).cast("int")) \
        .withColumn("RL_surfaceType", split_st.getItem(2).cast("int")) \
        .withColumn("RR_surfaceType", split_st.getItem(3).cast("int")) \

    df = df.drop("resultStatus", "pitStatus", "brakesTemperature", "tyresSurfaceTemperature", "tyresInnerTemperature", "tyresPressure", "surfaceType", "tyresWear", "tyresDamage")


In [7]:
def datasetDev(dataFilePath):
    df = spark.read.csv(dataFilePath, header=True, inferSchema=True)

    gp_id = re.search(r'_(\d+)', dataFilePath).group(1)

    df = df.orderBy(["pilot_index", "frameIdentifier"])

    window_spec = Window.partitionBy("pilot_index").orderBy("frameIdentifier")

    df = df.withColumn("row_num", row_number().over(window_spec))

    # Create chunk IDs (200 frames per chunk)
    df = df.withColumn("chunk_id", floor((col("row_num") - 1) / 200))

    # Save as partitioned Parquet files
    df.write.mode("overwrite").partitionBy("pilot_index", "chunk_id").option("header", "true").csv("telemetrydata/"+gp_id)

In [8]:
for i in os.listdir("f1-2020-race-data"):
    if "Telemetry" in i:
        datasetDev("f1-2020-race-data/"+i)

25/04/01 21:27:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [9]:

# Stop Spark session
spark.stop()

In [25]:
import os
import re

base_dir = "telemetrydata"
parquet_data = []
pattern = r"\d+"

#catch all details [driver number, session number, chunk number] so that we can define he file name with these caracterestics

for i in os.scandir(base_dir):
    if i.is_dir():
        #print(i.name)
        for j in os.scandir(i.path):
            if j.is_dir():
                #print('-------'+re.findall(pattern, str(j.name))[0])
                for k in os.scandir(j.path):
                    if k.is_dir():
                        #print('--'+re.findall(pattern, str(k.name))[0])
                        for l in os.scandir(k.path):
                            if l.is_file() and l.name.endswith(".csv"):
                                parquet_data.append([i.name, re.findall(pattern, str(j.name))[0], re.findall(pattern, str(k.name))[0], l.path])

#print(f"Found {len(parquet_data)} .parquet files")

In [26]:
wearhouse = "CSVTelemetryWearhouse"

def copy_file(file_path):
    shutil.copy2(file_path[3], wearhouse+"/"+file_path[0]+"_"+file_path[1]+"_"+file_path[2]+".csv")


os.makedirs(wearhouse, exist_ok=True)
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(copy_file, parquet_data)

#shutil.rmtree("/home/gesser/Documents/f1tele/telemetrydata")

In [9]:
print(len(os.listdir(wearhouse)))

396


In [5]:
shutil.rmtree('CSVTelemetryWearhouse')

----------------------------------------------------------------------------------------------------------------------------

In [17]:
import torch
import numpy as np

# Collecting data into a NumPy array
# Since the surface types are already integers, we select only the relevant columns
data = np.array(df.select("FL_surfaceType").collect())


                                                                                

In [57]:
df.select("pilot_index", "frameIdentifier", "FL_surfaceType").show(20, False)


[Stage 171:>                                                      (0 + 16) / 16]

+-----------+---------------+--------------+
|pilot_index|frameIdentifier|FL_surfaceType|
+-----------+---------------+--------------+
|0          |0              |0             |
|0          |1              |0             |
|0          |2              |0             |
|0          |3              |0             |
|0          |4              |0             |
|0          |5              |0             |
|0          |6              |0             |
|0          |7              |0             |
|0          |8              |0             |
|0          |9              |0             |
|0          |10             |0             |
|0          |12             |0             |
|0          |13             |0             |
|0          |14             |0             |
|0          |15             |0             |
|0          |16             |0             |
|0          |17             |0             |
|0          |18             |0             |
|0          |19             |0             |
|0        

                                                                                

In [18]:
# Show unique values in each surface type column
# Count the number of occurrences of each unique value in the column
df.groupBy("FL_surfaceType").count().show()


[Stage 42:>                                                         (0 + 4) / 4]

+--------------+-------+
|FL_surfaceType|  count|
+--------------+-------+
|             1|  54604|
|             7|    209|
|             2|  12095|
|             0|1063132|
+--------------+-------+



                                                                                

In [None]:

# Convert to torch tensor
X = torch.tensor(data, dtype=torch.float64)  # Ensure the input tensor is of type 'long' for embedding layers

-----------------------------------------

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, floor, split, col, when, sum
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead, when, avg, last
import re
import os
import sys
import shutil
from concurrent.futures import ThreadPoolExecutor


class dataset:

    def __init__(self, spark: SparkSession, df):
        self.spark = spark
        self.df = df

    def show(self, n=5):
        self.df.show(n=n)

    # Interpolate missing values by replacing each null with the average 
    # of the preceding and following non-null values
    # --> used for numerical values
    def interpolate(df, *columns):
        window_spec = Window.partitionBy("pilot_index").orderBy("frameIdentifier")
        
        for column in columns:
            df = df.withColumn("prev_value", lag(column).over(window_spec))
            df = df.withColumn("next_value", lead(column).over(window_spec))

            df = df.withColumn(column, 
                            when(col(column).isNull(), (col("prev_value") + col("next_value")) / 2)
                            .otherwise(col(column))
                            )
            df = df.drop("prev_value", "next_value")
        return df
    
    # forwardpass funtion, to replace each null value with the preceding
    # non-null values
    # --> used for categorical vaules
    def forwardpass(df, *columns):
        windowSpec = Window.partitionBy("pilot_index").orderBy("frameIdentifier").rowsBetween(-sys.maxsize, 0)

        for column in columns:
            df = df.withColumn(
                column,  # Name the output column based on the input
                last(col(column), ignorenulls=True).over(windowSpec)
            )

        return df
    
    # a mathod to transform the dataset
    def transform(self):
        self.df = self.df.orderBy(["pilot_index", "frameIdentifier"])

        # Making the pit status and resultstatus columns binary instead of String + imputating the Null variables

        self.df = self.df.withColumn("inPitArea", when(col("pitStatus").isNull(), False).otherwise(True))
        self.df = self.df.withColumn("pitting", when(col("pitStatus")=="pitting", True).otherwise(False))
        self.df = self.df.withColumn("active", when(col("resultStatus").isNull(), False).otherwise(True))

        # mapping tyre compounds to integers instead of string

        self.df = self.df.withColumn("actualTyreCompound", when(col("actualTyreCompound")=="soft", 0) \
            .when(col("actualTyreCompound")=="medium", 1) \
            .when(col("actualTyreCompound")=="hard", 2))

        #Splitting tyre and break columns with this format: FL/FR/RL/RR to 4 columns for each tyre
        split_bt = split(col("brakesTemperature"), "/")
        split_tst = split(col("tyresSurfaceTemperature"), "/")
        split_tit = split(col("tyresInnerTemperature"), "/")
        split_tp = split(col("tyresPressure"), "/")
        split_st = split(col("surfaceType"), "/")
        split_tw = split(col("tyresWear"), "/")
        split_td = split(col("tyresDamage"), "/")

        # Create new columns for each tyre
        self.df = self.df.withColumn("FL_tyresSurfaceTemperature", split_tst.getItem(0).cast("double")) \
            .withColumn("FR_tyresSurfaceTemperature", split_tst.getItem(1).cast("double")) \
            .withColumn("RL_tyresSurfaceTemperature", split_tst.getItem(2).cast("double")) \
            .withColumn("RR_tyresSurfaceTemperature", split_tst.getItem(3).cast("double")) \
            .withColumn("FL_tyresInnerTemperature", split_tit.getItem(0).cast("double")) \
            .withColumn("FR_tyresInnerTemperature", split_tit.getItem(1).cast("double")) \
            .withColumn("RL_tyresInnerTemperature", split_tit.getItem(2).cast("double")) \
            .withColumn("RR_tyresInnerTemperature", split_tit.getItem(3).cast("double")) \
            .withColumn("FL_tyresPressure", split_tp.getItem(0).cast("double")) \
            .withColumn("FR_tyresPressure", split_tp.getItem(1).cast("double")) \
            .withColumn("RL_tyresPressure", split_tp.getItem(2).cast("double")) \
            .withColumn("RR_tyresPressure", split_tp.getItem(3).cast("double")) \
            .withColumn("FL_tyresWear", split_tw.getItem(0).cast("double")) \
            .withColumn("FR_tyresWear", split_tw.getItem(1).cast("double")) \
            .withColumn("RL_tyresWear", split_tw.getItem(2).cast("double")) \
            .withColumn("RR_tyresWear", split_tw.getItem(3).cast("double")) \
            .withColumn("FL_tyresDamage", split_td.getItem(0).cast("double")) \
            .withColumn("FR_tyresDamage", split_td.getItem(1).cast("double")) \
            .withColumn("RL_tyresDamage", split_td.getItem(2).cast("double")) \
            .withColumn("RR_tyresDamage", split_td.getItem(3).cast("double")) \
            .withColumn("FL_brakesTemperature", split_bt.getItem(0).cast("double")) \
            .withColumn("FR_brakesTemperature", split_bt.getItem(1).cast("double")) \
            .withColumn("RL_brakesTemperature", split_bt.getItem(2).cast("double")) \
            .withColumn("RR_brakesTemperature", split_bt.getItem(3).cast("double")) \
            .withColumn("FL_surfaceType", split_st.getItem(0).cast("int")) \
            .withColumn("FR_surfaceType", split_st.getItem(1).cast("int")) \
            .withColumn("RL_surfaceType", split_st.getItem(2).cast("int")) \
            .withColumn("RR_surfaceType", split_st.getItem(3).cast("int")) \

        self.df = self.df.drop("resultStatus", "pitStatus", "brakesTemperature", "tyresSurfaceTemperature", "tyresInnerTemperature", "tyresPressure", "surfaceType", "tyresWear", "tyresDamage")
        
    # applying the interpolation and forward passing functions after transformation
    def imputation(self):
        self.df = self.forwardpass(self.df, "FL_surfaceType", "FR_surfaceType", "RL_surfaceType", "RR_surfaceType", "pitLimiterStatus", "actualTyreCompound", "drs", "gear", "ersDeployMode", "fuelMix")
        self.df = self.interpolate(self.df, "speed", "throttle", "steer", "brake", "clutch", "engineRPM", "engineTemperature", "fuelInTank", "fuelRemainingLaps", "ersStoreEnergy", "ersHarvestedThisLapMGUK", "ersHarvestedThisLapMGUH", "ersDeployedThisLap", "FL_tyresSurfaceTemperature","FR_tyresSurfaceTemperature","RL_tyresSurfaceTemperature","RR_tyresSurfaceTemperature","FL_tyresInnerTemperature","FR_tyresInnerTemperature","RL_tyresInnerTemperature","RR_tyresInnerTemperature","FL_tyresPressure","FR_tyresPressure","RL_tyresPressure","RR_tyresPressure","FL_tyresWear","FR_tyresWear","RL_tyresWear","RR_tyresWear","FL_tyresDamage","FR_tyresDamage","RL_tyresDamage","RR_tyresDamage","FL_brakesTemperature","FR_brakesTemperature","RL_brakesTemperature","RR_brakesTemperature")

    def datasetDev(self, dataFilePath):
        #df = self.spark.read.csv(dataFilePath, header=True, inferSchema=True)

        gp_id = re.search(r'_(\d+)', dataFilePath).group(1)

        #df = df.orderBy(["pilot_index", "frameIdentifier"])

        window_spec = Window.partitionBy("pilot_index").orderBy("frameIdentifier")

        df = df.withColumn("row_num", row_number().over(window_spec))

        # Create chunk IDs (200 frames per chunk)
        df = df.withColumn("chunk_id", floor((col("row_num") - 1) / 200))

        # Save as partitioned Parquet files
        df.write.mode("overwrite").partitionBy("pilot_index", "chunk_id").option("header", "true").csv("telemetrydata/"+gp_id)

In [2]:
spark = SparkSession.builder \
    .appName("TelemetryProcessing") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

25/05/02 11:46:28 WARN Utils: Your hostname, gesser-ASUS-TUF-Gaming-F15-FX506HM-FX506HM resolves to a loopback address: 127.0.1.1; using 192.168.1.17 instead (on interface wlo1)
25/05/02 11:46:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/02 11:46:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/05/02 11:46:40 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
import kagglehub

path = kagglehub.dataset_download("coni57/f1-2020-race-data")

print("Path to dataset files:", path)

Path to dataset files: /home/gesser/.cache/kagglehub/datasets/coni57/f1-2020-race-data/versions/26


In [13]:
fileList = os.listdir(path)

classes = []
for dataFile in fileList:
  if 'Telemetry' in dataFile:
    classes.append(dataset(spark, spark.read.csv(path+"/"+dataFile, header=True, inferSchema=True)))

                                                                                

In [14]:
for i in classes:
    i.show()

25/05/02 12:13:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------+---------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+----------------+----------------+--------------+--------------+--------------+-------------+------------------+--------------+-------+-------+-------+-----+--------+-----+-----+------+----+---------+---+-----------------+-----------------------+---------------------+-----------------+--------------------+-----------+-------+----------------+----------+-----------------+--------------------+------------------+--------------------+--------------+-------------+-----------------------+-----------------------+------------------+-----------+--------------+-------------+-----------+-------------+---------+------+------------+------------+
|sessionTime|frameIdentifier|pilot_index|worldPositionX|worldPositionY|worldPositionZ|worldVelocityX|worldVelocityY|worldVelocityZ|worldForwardDirX|worldForwardDirY|worldForwardDirZ|worldRightDirX|worldRightDirY