### step 1 - read the json file using the spark dataframe api

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [0]:
name_schema = StructType(fields=[StructField("forename",StringType(),True),
                                StructField("surname",StringType(),True)])

In [0]:
drivers_schema = StructType(fields=[StructField("driverId",IntegerType(),False),
                                  StructField("driverRef",StringType(),True),
                                  StructField("number",IntegerType(),True),
                                  StructField("code",StringType(),True),
                                  StructField("name",name_schema),
                                  StructField("dob",DateType(),True),
                                  StructField("nationality",StringType(),True),
                                  StructField("url",StringType(),True)])

In [0]:
drivers_df = spark.read \
    .schema(drivers_schema) \
    .json(f"{raw_folder_path}/{v_file_date}/drivers.json")

### step 2 - rename columns and add new columns

In [0]:
drivers_renamed_df = drivers_df.withColumnRenamed("driverId","driver_id") \
    .withColumnRenamed("driverRef","driver_ref") 

In [0]:
from pyspark.sql.functions import current_timestamp, col, concat_ws, lit

In [0]:
drivers_fullname_df = drivers_renamed_df.withColumn("ingestion_date",current_timestamp()) \
    .withColumn("name",concat_ws(' ',col("name.forename"), col("name.surname"))) \
    .withColumn("data_source",lit(v_data_source)) \
    .withColumn("file_date",lit(v_file_date))

### drop the unwanted columns

In [0]:
drivers_final_df = drivers_fullname_df.drop("url")

### writing to parquet

In [0]:
drivers_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.drivers")

In [0]:
dbutils.notebook.exit("Success")

Success

In [0]:
v_file_date