### Ingest drivers.json file

In [0]:
dbutils.widgets.text("p_data_src", "")
v_data_src=dbutils.widgets.get("p_data_src")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date=dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/common_functions"

In [0]:
%run "../includes/configuration"

Step 1 - reading the json file using dfreader api


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DataType

In [0]:
from pyspark.sql.functions import current_timestamp, lit, col,concat

In [0]:
name_schema= StructType(fields=[StructField("forename", StringType(), True),
                                 StructField("surname", StringType(), True)])
drivers_schema = StructType(fields=[StructField("driverId", IntegerType(), False),
                                 StructField("driverRef", StringType(), True),
                                 StructField("number", IntegerType(), True),
                                 StructField("code", StringType(), True),
                                 StructField("name", name_schema),
                                 StructField("dob", StringType(), True),
                                 StructField("nationality", StringType(), True),
                                 StructField("url", StringType(), True)])

In [0]:
drivers_df = spark.read\
.schema(drivers_schema)\
.json(f"{raw_folder_path}/{v_file_date}/drivers.json")
display(drivers_df)


driverId,driverRef,number,code,name,dob,nationality,url
1,hamilton,44.0,HAM,"List(Lewis, Hamilton)",1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,,HEI,"List(Nick, Heidfeld)",1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6.0,ROS,"List(Nico, Rosberg)",1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14.0,ALO,"List(Fernando, Alonso)",1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,,KOV,"List(Heikki, Kovalainen)",1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,,NAK,"List(Kazuki, Nakajima)",1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,,BOU,"List(Sébastien, Bourdais)",1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7.0,RAI,"List(Kimi, Räikkönen)",1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88.0,KUB,"List(Robert, Kubica)",1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,,GLO,"List(Timo, Glock)",1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


Step-2 - Renaming columns and add new columns
1. driverId renamed to driver_id
2. driverRef renamed to driver_ref
3. name added with concatenated forename and surname

In [0]:
drivers_renamed_df = drivers_df.withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("driverRef", "driver_ref")\
.withColumn("name",concat(col("name.forename"), lit(" "), col("name.surname")))\
.withColumn("data_source",lit(v_data_src))\
.withColumn("file_date",lit(v_file_date))


In [0]:
drivers_final_df1= add_ingestion_date(drivers_renamed_df)

In [0]:
display(drivers_final_df1)

driver_id,driver_ref,number,code,name,dob,nationality,url,data_source,file_date,ingestion_date
1,hamilton,44.0,HAM,Lewis Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
2,heidfeld,,HEI,Nick Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
3,rosberg,6.0,ROS,Nico Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
4,alonso,14.0,ALO,Fernando Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
5,kovalainen,,KOV,Heikki Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
6,nakajima,,NAK,Kazuki Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
7,bourdais,,BOU,Sébastien Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
8,raikkonen,7.0,RAI,Kimi Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
9,kubica,88.0,KUB,Robert Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z
10,glock,,GLO,Timo Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock,Ergast API,2021-04-18,2025-08-05T11:43:21.709328Z


Step 3 - Dropping the unwanted columns
1. name.forename
2. name.surname
3. url

In [0]:
drivers_final_df= drivers_final_df1.drop(col("url"))
drivers_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.drivers")

In [0]:
%sql
select * from f1_processed.drivers;

driver_id,driver_ref,number,code,name,dob,nationality,data_source,file_date,ingestion_date
1,hamilton,44.0,HAM,Lewis Hamilton,1985-01-07,British,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
2,heidfeld,,HEI,Nick Heidfeld,1977-05-10,German,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
3,rosberg,6.0,ROS,Nico Rosberg,1985-06-27,German,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
4,alonso,14.0,ALO,Fernando Alonso,1981-07-29,Spanish,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
5,kovalainen,,KOV,Heikki Kovalainen,1981-10-19,Finnish,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
6,nakajima,,NAK,Kazuki Nakajima,1985-01-11,Japanese,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
7,bourdais,,BOU,Sébastien Bourdais,1979-02-28,French,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
8,raikkonen,7.0,RAI,Kimi Räikkönen,1979-10-17,Finnish,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
9,kubica,88.0,KUB,Robert Kubica,1984-12-07,Polish,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z
10,glock,,GLO,Timo Glock,1982-03-18,German,Ergast API,2021-04-18,2025-08-05T11:43:22.482316Z


In [0]:
dbutils.notebook.exit("Success")