%md

  
 | Data source: ADLS ==> /mnt/demodatabrick/formula1/                         
 | output storage: ADLS ==> /mnt/demodatabrick/formula1/processed            
 | Filename: races                                                         
 | File Format: csv                                                          
 | Purpose of the codes: Extract Data from source, clean and save in storage  


In [0]:
%run "/Workspace/Projects/formula1/Data Ingestion/Prod/01-setup"

In [0]:
%run "/Workspace/Projects/formula1/Data Ingestion/Prod/02-common_functions"

 Read CSV file using spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import current_timestamp, to_timestamp, concat, lit, col

In [0]:
races_schema=StructType(fields=[StructField("raceId",IntegerType(),False),
                                   StructField("year",IntegerType(),True),
                                   StructField("round",IntegerType(),True),
                                   StructField("circuitId",IntegerType(),False),
                                   StructField("name",StringType(),True),
                                   StructField("date",DateType(),True),
                                   StructField("time",StringType(),True),
                                   StructField("url",StringType(),True)
                                   ])

In [0]:
races_df=spark.read\
    .option("header",True)\
    .schema(races_schema)\
    .csv("/mnt/demodatabrick/formula1/races.csv")


 Add ingestion date and race_timestamp to the dataframe

In [0]:
races_with_timestamp_df=races_df.withColumn("ingestion_date",current_timestamp())\
    .withColumn("race_timestamp",to_timestamp(concat(col("date"),lit(" "),col("time")),"yyyy-MM-dd HH:mm:ss"))\
    .withColumn("data_source",lit("v_data_source"))


 Select columns required 

In [0]:
races_select_df=races_with_timestamp_df.select(col("raceId").alias("race_id"),col("year").alias("year_id"),col("round"),col("circuitId").alias("circuit_id"),col("name"),col("ingestion_date"),col("race_timestamp"))

 Write the final output

In [0]:
races_select_df.write.mode("overwrite").partitionBy('year_id').parquet("/mnt/demodatabrick/formula1/processed/races")

In [0]:
dbutils.notebook.exit("Succeed")