### Reading Races.csv file

In [0]:
dbutils.notebook.run('../Mounting_storage', 60)

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,DoubleType,StringType,DateType

In [0]:
races_schema = StructType(fields=[StructField("raceId",IntegerType(),False),
                                 StructField("year",IntegerType(),True),
                                 StructField("round",IntegerType(),True),
                                 StructField("circuitId",IntegerType(),True),
                                 StructField("name",StringType(),True),
                                 StructField("date",DateType(), True),
                                 StructField("time",StringType(),True),
                                 StructField("url",StringType(),True)
                                 ])

In [0]:
df_races = spark.read.csv('/mnt/blobstorage/races.csv', header=True, schema=races_schema)
display(df_races.head(5))

### Adding Ingestion date and race time

In [0]:
from pyspark.sql.functions import current_timestamp,current_date,col, to_timestamp, concat_ws
df_races = df_races.withColumn("ingestion_date", current_timestamp()) \
.withColumn("race_timestamp", to_timestamp(concat_ws(' ', col('date'), col('time')), 'yyyy-MM-dd HH:mm:ss'))
display(df_races.head(5))

### Select Required columns and renaming column

In [0]:
races_select_df = df_races.select(col("raceid").alias("race_id"),
                                         col("year").alias("race_year"),
                                         col("round"),
                                         col("circuitId").alias("circuit_id"),
                                         col("name").alias("race_name"),
                                         col("date"),
                                         col("time"),
                                         col("ingestion_date"),
                                         col("race_timestamp"))
display(races_select_df.head(5))  

### Writing to Parquet

In [0]:
races_select_df.write.mode("overwrite").parquet('/mnt/blobstorage/races')

In [0]:
%fs
ls mnt/blobstorage/races

#### Creation of Partition for race.csv file

In [0]:
races_select_df.write.mode("overwrite").partitionBy("race_year").parquet('/mnt/blobstorage/races')

In [0]:
%fs
ls mnt/blobstorage/races