#### Ingest races.csv file

#### Step 1 - Read the CSV file using the spark data frame reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [0]:
races_schema = StructType(fields=[  StructField("race_id", IntegerType(), False),
                                    StructField("year", IntegerType(), True),
                                    StructField("round", IntegerType(), True),
                                    StructField("circuitId", IntegerType(), True),
                                    StructField("name", StringType(), True),
                                    StructField("date", DateType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("url", StringType(), True),
                                  ])

In [0]:
races_df = spark.read.option("header", "true").schema(races_schema).csv("/mnt/f1dlportfolio/raw/races.csv")

In [0]:
display(races_df)

In [0]:
from pyspark.sql.functions import current_timestamp, to_timestamp, lit, concat, col

In [0]:
races_with_timestamp_df = races_df.withColumn("ingestion _date", current_timestamp()).withColumn("race_timestamp", to_timestamp(concat(col('date'), lit(' '), col('time')),'yyyy-MM-dd HH:mm:ss'))

In [0]:
display(races_with_timestamp_df)

#### Step 3 - Select only the column required & rename as required

In [0]:
races_selected_df = races_with_timestamp_df.select(col('race_id').alias("race_id"), col('year').alias("race_year"), col('round'), col('circuitId').alias("circuit_id"), col('name'), col('ingestion _date'), col('race_timestamp'))

In [0]:
display(races_selected_df)

#### Step 4 - Write the output to processed container in parquet format

In [0]:
races_selected_df.write.mode("overwrite").parquet("/mnt/f1dlportfolio/processed/races.parquet")

In [0]:
%fs
ls /mnt/f1dlportfolio/processed/races.parquet

In [0]:
display(spark.read.parquet("/mnt/f1dlportfolio/processed/races.parquet"))