## Ingest races.csv files

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "../includes/configurations"

In [0]:
%run "../includes/common_functions"

In [0]:
display(dbutils.fs.ls(f"{raw_folder_path}"))

**Step 1 : using structfield and struct type create a schema**

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, TimestampType

races_schema = StructType(fields = [StructField("raceId", IntegerType(), False),
                                    StructField("year", IntegerType(), True),
                                    StructField("round", IntegerType(), True),
                                    StructField("circuitId", IntegerType(), True),
                                    StructField("name", StringType(), True),
                                    StructField("date", DateType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("url", StringType(), True)
                                    ])

In [0]:
races_df = spark.read \
    .option("header", True) \
    .schema(races_schema) \
    .csv(f"{raw_folder_path}/races.csv")

races_df.limit(5).display()

In [0]:
display(races_df.printSchema())

**Step 2: Select only the required columns and add timestamp to race timestamp and ingestion timestamp**

In [0]:
from pyspark.sql.functions import current_timestamp, to_timestamp, col, lit, concat

races_with_timestamp_df = races_df.withColumn("race_timestamp",
                                              to_timestamp(concat(col("date"), lit(" "), col("time")), 'yyyy-MM-dd HH:mm:ss')) \
                                  .withColumn("data_source", lit(v_data_source))

In [0]:
races_with_ingestion_date = ingestion_date(races_with_timestamp_df)

In [0]:
races_with_ingestion_date.limit(5).display()

In [0]:
races_selected_df = races_with_ingestion_date.select(col("raceId").alias("race_id"),
                                                   col('year').alias('race_year'),
                                                   col('round'),
                                                   col('circuitId').alias('circuit_id'),
                                                   col('name'), 
                                                   col('ingestion_date'),
                                                   col('race_timestamp'))

races_selected_df.limit(5).display()

**Step 4: Write the df in parquet format and partitioned it on race_year**

In [0]:
races_selected_df.write.mode('overwrite').partitionBy('race_year').format("parquet").saveAsTable("f1_processed.races")

In [0]:
races_final_df = spark.read.parquet(f'{processed_folder_path}/races')

races_final_df.limit(5).display()

In [0]:
%sql
SELECT * FROM f1_processed.races

In [0]:
dbutils.notebook.exit("Success")