### Ingest lap_times folder

##### Step 1 - Read the CSV file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
# specify schema 
lap_times_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("position", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

In [0]:
# read the CSV files by specifying the folder or use wildcards to specify individual files
# wildcards paths - .csv("/mnt/formula1dl119/raw/lap_times/lap_times_split*.csv")
lap_times_df = spark.read \
.schema(lap_times_schema) \
.csv("/mnt/formula1dl119/raw/lap_times")

In [0]:
display(lap_times_df)

raceId,driverId,lap,position,time,milliseconds
841,20,1,1,1:38.109,98109
841,20,2,1,1:33.006,93006
841,20,3,1,1:32.713,92713
841,20,4,1,1:32.803,92803
841,20,5,1,1:32.342,92342
841,20,6,1,1:32.605,92605
841,20,7,1,1:32.502,92502
841,20,8,1,1:32.537,92537
841,20,9,1,1:33.240,93240
841,20,10,1,1:32.572,92572


In [0]:
# check all records from five CSV files have been ingested
lap_times_df.count()

490904

##### Step 2 - Rename columns and add new columns
1. Rename driverId and raceId
1. Add ingestion_date with current timestamp

In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
final_df = lap_times_df.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id") \
.withColumn("ingestion_date", current_timestamp())

In [0]:
display(final_df)

race_id,driver_id,lap,position,time,milliseconds,ingestion_date
841,20,1,1,1:38.109,98109,2024-05-08T17:26:45.488Z
841,20,2,1,1:33.006,93006,2024-05-08T17:26:45.488Z
841,20,3,1,1:32.713,92713,2024-05-08T17:26:45.488Z
841,20,4,1,1:32.803,92803,2024-05-08T17:26:45.488Z
841,20,5,1,1:32.342,92342,2024-05-08T17:26:45.488Z
841,20,6,1,1:32.605,92605,2024-05-08T17:26:45.488Z
841,20,7,1,1:32.502,92502,2024-05-08T17:26:45.488Z
841,20,8,1,1:32.537,92537,2024-05-08T17:26:45.488Z
841,20,9,1,1:33.240,93240,2024-05-08T17:26:45.488Z
841,20,10,1,1:32.572,92572,2024-05-08T17:26:45.488Z


##### Step 3 - Write to output to processed container in parquet format

In [0]:
final_df.write.mode("overwrite").parquet("/mnt/formula1dl119/processed/lap_times")

In [0]:
display(spark.read.parquet("/mnt/formula1dl119/processed/lap_times"))

race_id,driver_id,lap,position,time,milliseconds,ingestion_date
67,14,26,13,1:25.802,85802,2024-05-08T17:26:46.007Z
67,14,27,13,1:25.338,85338,2024-05-08T17:26:46.007Z
67,14,28,13,1:25.395,85395,2024-05-08T17:26:46.007Z
67,14,29,12,1:26.191,86191,2024-05-08T17:26:46.007Z
67,14,30,11,1:25.439,85439,2024-05-08T17:26:46.007Z
67,14,31,10,1:25.375,85375,2024-05-08T17:26:46.007Z
67,14,32,12,1:28.219,88219,2024-05-08T17:26:46.007Z
67,14,33,13,1:49.156,109156,2024-05-08T17:26:46.007Z
67,14,34,13,1:25.128,85128,2024-05-08T17:26:46.007Z
67,14,35,13,1:25.351,85351,2024-05-08T17:26:46.007Z
