
### Ingest races.csv file


##### Step 1 - Read the CSV file using the spark dataframe reader API, Set the schema


In [0]:
# display(dbutils.fs.ls("/mnt/jumayelformula1dl/raw"))

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [0]:
races_schema = StructType(
    [
        StructField("raceId", IntegerType(), False),
        StructField("year", IntegerType(), True),
        StructField("round", IntegerType(), True),
        StructField("circuitId", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("date", DateType(), True),
        StructField("time", StringType(), True),
        StructField("url", StringType(), True)
    ]

)

In [0]:
races_df = spark.read.schema(races_schema).csv("/mnt/jumayelformula1dl/raw/races.csv", header=True)


##### Step 2 - Add race_timestamp to the dataframe


In [0]:
from pyspark.sql.functions import col, to_timestamp, concat, lit, current_timestamp

races_df_with_race_timestamp = races_df\
    .withColumn("race_timestamp", to_timestamp(concat(col("date"), lit(" "), col("time")), "yyyy-MM-dd HH:mm:ss"))



##### Step 3 - Select required columns

In [0]:
races_selected_df = races_df_with_race_timestamp.select(col("raceId"), col("year"), col("round"), col("circuitId"), col("name"), col("race_timestamp"))


##### Step 4 - Rename columns as required

In [0]:
races_df_renamed = races_selected_df.withColumnRenamed("raceId", "race_id")\
.withColumnRenamed("year", "race_year")\
.withColumnRenamed("circuitId", "circuit_id")


##### Step 5 - Add ingestion_date column

In [0]:
races_df_final = races_df_renamed.withColumn("ingestion_date", current_timestamp())


##### Step 6 - Write the dataframe as a parquet file to the desired directory

In [0]:
races_df_final.write.partitionBy("race_year").parquet("mnt/jumayelformula1dl/processed/races/", mode="overwrite")

In [0]:
df = spark.read.parquet("/mnt/jumayelformula1dl/processed/races/")
display(df)

race_id,round,circuit_id,name,race_timestamp,ingestion_date,race_year
1053,2,21,Emilia Romagna Grand Prix,2021-04-18T13:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1052,1,3,Bahrain Grand Prix,2021-03-28T15:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1051,21,1,Australian Grand Prix,2021-11-21T06:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1054,3,20,TBC,,2023-09-23T17:41:31.166+0000,2021
1055,4,4,Spanish Grand Prix,2021-05-09T13:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1056,5,6,Monaco Grand Prix,2021-05-23T13:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1057,6,73,Azerbaijan Grand Prix,2021-06-06T12:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1058,7,7,Canadian Grand Prix,2021-06-13T18:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1059,8,34,French Grand Prix,2021-06-27T13:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
1060,9,70,Austrian Grand Prix,2021-07-04T13:00:00.000+0000,2023-09-23T17:41:31.166+0000,2021
