#### Step 1 : Read csv file for races to dataFrame



In [0]:
dbutils.widgets.text('p_source_file','')

In [0]:
v_source_file = dbutils.widgets.get('p_source_file')

In [0]:
%run "../includes/configuration"

##### Step 2 : set the schema for the dataframe

In [0]:
from pyspark.sql.types import StringType, StructType, StructField , IntegerType , DateType



In [0]:
race_schema = StructType(fields = [StructField("race_id", IntegerType(), True),
                                   StructField("year", IntegerType(), True),
                                   StructField("round", IntegerType(), True),
                                   StructField("circuitId", IntegerType(), True),
                                   StructField("name", StringType(), True),
                                   StructField("date", DateType(), True),
                                   StructField("time", StringType(), True)])

In [0]:
races_df = spark.read \
    .option("header", "true") \
    .schema(race_schema) \
    .csv(f"{raw_folder_path}/races.csv")

#### Step 3 : add the new column : race_timestamp by concating 2 other column values.

In [0]:
from pyspark.sql.functions import col, lit, to_timestamp, concat, current_timestamp

In [0]:
races_newcolumn_df = races_df   \
.withColumn("race_timestamp", to_timestamp(concat(col('date'), lit(' '), col('time')),'yyyy-MM-dd HH:mm:ss'))  

#### Step 4 : add the new column : ingestion date set as current time stamp

In [0]:
races_newcolumn_df = races_newcolumn_df.withColumn("ingestion_date",current_timestamp()) \
                                       .withColumn("data_source",lit(v_source_file))

In [0]:
races_final_df = races_newcolumn_df.select(col("race_id").alias("race_id"),
                                  col("year").alias("race_year"),
                                  col("round").alias("round"),
                                  col("circuitid").alias("circuit_id"),
                                  col("name"),
                                  col("ingestion_date"),
                                  col("race_timestamp"),
                                  col("data_source"))

#### Step 5 : write this data to parquet file as destination.

In [0]:
races_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/races/")

#### Step 6 : partition the data by race year.

In [0]:
races_final_df.write.mode("overwrite").partitionBy("race_year").parquet(f"{processed_folder_path}/races/")

In [0]:
display(spark.read.parquet(f"{processed_folder_path}/races/"))

In [0]:
dbutils.notebook.exit("Success")