#### Step 1. Read JSON file using spark dataframe reader

In [0]:
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

In [0]:
constructor_df = spark.read.schema(constructors_schema).format("json").load("dbfs:/FileStore/shared_uploads/antony.prince@thoughtworks.com/constructors.json")

In [0]:
display(constructor_df)

constructorId,constructorRef,name,nationality,url
1,mclaren,McLaren,British,http://en.wikipedia.org/wiki/McLaren
2,bmw_sauber,BMW Sauber,German,http://en.wikipedia.org/wiki/BMW_Sauber
3,williams,Williams,British,http://en.wikipedia.org/wiki/Williams_Grand_Prix_Engineering
4,renault,Renault,French,http://en.wikipedia.org/wiki/Renault_in_Formula_One
5,toro_rosso,Toro Rosso,Italian,http://en.wikipedia.org/wiki/Scuderia_Toro_Rosso
6,ferrari,Ferrari,Italian,http://en.wikipedia.org/wiki/Scuderia_Ferrari
7,toyota,Toyota,Japanese,http://en.wikipedia.org/wiki/Toyota_Racing
8,super_aguri,Super Aguri,Japanese,http://en.wikipedia.org/wiki/Super_Aguri_F1
9,red_bull,Red Bull,Austrian,http://en.wikipedia.org/wiki/Red_Bull_Racing
10,force_india,Force India,Indian,http://en.wikipedia.org/wiki/Racing_Point_Force_India


In [0]:
constructor_df.printSchema()

root
 |-- constructorId: integer (nullable = true)
 |-- constructorRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)



Step 2. Drop unwanted columns from dataframe

In [0]:
from pyspark.sql.functions import col,current_timestamp

In [0]:
constructor_dropped_df = constructor_df.drop(col('url'))

In [0]:
constructor_final_df = constructor_dropped_df.withColumnRenamed("constructorId","constructor_id") \
                                             .withColumnRenamed("constructorRef","constructor_ref")\
                                             .withColumn("ingestion_date",current_timestamp())

In [0]:
display(constructor_final_df)

constructor_id,constructor_ref,name,nationality,ingestion_date
1,mclaren,McLaren,British,2023-02-14T06:31:02.481+0000
2,bmw_sauber,BMW Sauber,German,2023-02-14T06:31:02.481+0000
3,williams,Williams,British,2023-02-14T06:31:02.481+0000
4,renault,Renault,French,2023-02-14T06:31:02.481+0000
5,toro_rosso,Toro Rosso,Italian,2023-02-14T06:31:02.481+0000
6,ferrari,Ferrari,Italian,2023-02-14T06:31:02.481+0000
7,toyota,Toyota,Japanese,2023-02-14T06:31:02.481+0000
8,super_aguri,Super Aguri,Japanese,2023-02-14T06:31:02.481+0000
9,red_bull,Red Bull,Austrian,2023-02-14T06:31:02.481+0000
10,force_india,Force India,Indian,2023-02-14T06:31:02.481+0000


Step 4. Write output to parquet file

In [0]:
constructor_final_df.write.mode("overwrite").parquet("./constructors")


### Read Json file

In [0]:
from pyspark.sql.types import StructType,IntegerType, StructField, StringType,DateType, FloatType

In [0]:
name_schema = StructType(fields = [StructField("forename",StringType(),True),
                                   StructField("surname",StringType(),True)
                                  ])

In [0]:
drivers_schema = StructType(fields = [StructField("driverId",IntegerType(),False),
                                      StructField("driverRef",StringType(),True),
                                      StructField("number",IntegerType(),True),
                                      StructField("code",StringType(),True),
                                      StructField("name",name_schema),
                                      StructField("dob",DateType(),True),
                                      StructField("nationality",StringType(),True),
                                      StructField("url",StringType(),True),
                                  ])

In [0]:
drivers_df = spark.read.format("json").schema(drivers_schema).load("dbfs:/FileStore/shared_uploads/antony.prince@thoughtworks.com/drivers.json")

In [0]:
display(drivers_df)

driverId,driverRef,number,code,name,dob,nationality,url
1,hamilton,44.0,HAM,"List(Lewis, Hamilton)",1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,,HEI,"List(Nick, Heidfeld)",1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6.0,ROS,"List(Nico, Rosberg)",1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14.0,ALO,"List(Fernando, Alonso)",1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,,KOV,"List(Heikki, Kovalainen)",1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,,NAK,"List(Kazuki, Nakajima)",1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,,BOU,"List(Sébastien, Bourdais)",1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7.0,RAI,"List(Kimi, Räikkönen)",1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88.0,KUB,"List(Robert, Kubica)",1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,,GLO,"List(Timo, Glock)",1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


Rename columns and add columns

In [0]:
from pyspark.sql.functions import col,concat,current_timestamp,lit

In [0]:
drivers_with_column_df = drivers_df.withColumnRenamed("driverId","driver_id") \
                                   .withColumnRenamed("driverRef","driver_ref") \
                                   .withColumn("ingestion_date",current_timestamp()) \
                                   .withColumn("name",concat(col("name.forename"),lit(" "),col("name.surname")))

In [0]:
display(drivers_with_column_df)

driver_id,driver_ref,number,code,name,dob,nationality,url,ingestion_date
1,hamilton,44.0,HAM,Lewis Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton,2023-02-14T07:01:11.055+0000
2,heidfeld,,HEI,Nick Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld,2023-02-14T07:01:11.055+0000
3,rosberg,6.0,ROS,Nico Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg,2023-02-14T07:01:11.055+0000
4,alonso,14.0,ALO,Fernando Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso,2023-02-14T07:01:11.055+0000
5,kovalainen,,KOV,Heikki Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen,2023-02-14T07:01:11.055+0000
6,nakajima,,NAK,Kazuki Nakajima,1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima,2023-02-14T07:01:11.055+0000
7,bourdais,,BOU,Sébastien Bourdais,1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais,2023-02-14T07:01:11.055+0000
8,raikkonen,7.0,RAI,Kimi Räikkönen,1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen,2023-02-14T07:01:11.055+0000
9,kubica,88.0,KUB,Robert Kubica,1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica,2023-02-14T07:01:11.055+0000
10,glock,,GLO,Timo Glock,1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock,2023-02-14T07:01:11.055+0000


Drop unwanted columns

In [0]:
drivers_final_df = drivers_with_column_df.drop(col("url"))

In [0]:
drivers_final_df.write.mode("overwrite").parquet("./drivers")


Read results.json

In [0]:
results_schema = StructType(fields =[StructField("resultId", IntegerType(),False),
                                     StructField("raceId",IntegerType(),True),
                                     StructField("driverId",IntegerType(),True),
                                     StructField("constructorId",IntegerType(),True),
                                     StructField("number",IntegerType(),True),
                                     StructField("grid",IntegerType(),True),
                                     StructField("position",IntegerType(),True),
                                     StructField("positionText",StringType(),True),
                                     StructField("positionOrder",IntegerType(),True),
                                     StructField("points",FloatType(),True),
                                     StructField("laps",IntegerType(),True),
                                     StructField("time",StringType(),True),
                                     StructField("milliseconds",IntegerType(),True),
                                     StructField("fastestLap",IntegerType(),True),
                                     StructField("rank",IntegerType(),True),
                                     StructField("fastestLapTime",StringType(),True),
                                     StructField("fastestLapSpeed",FloatType(),True),
                                     StructField("statusId",StringType(),True),
                                    ] )

In [0]:
results_df = spark.read.format("json").schema(results_schema).load("dbfs:/FileStore/shared_uploads/antony.prince@thoughtworks.com/results.json")

In [0]:
display(results_df)

resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
1,18,1,1,22,1,1.0,1,1,10.0,58,1:34:50.616,5690616.0,39.0,2.0,1:27.452,218.3,1
2,18,2,2,3,5,2.0,2,2,8.0,58,+5.478,5696094.0,41.0,3.0,1:27.739,217.586,1
3,18,3,3,7,7,3.0,3,3,6.0,58,+8.163,5698779.0,41.0,5.0,1:28.090,216.719,1
4,18,4,4,5,11,4.0,4,4,5.0,58,+17.181,5707797.0,58.0,7.0,1:28.603,215.464,1
5,18,5,1,23,3,5.0,5,5,4.0,58,+18.014,5708630.0,43.0,1.0,1:27.418,218.385,1
6,18,6,3,8,13,6.0,6,6,3.0,57,\N,,50.0,14.0,1:29.639,212.974,11
7,18,7,5,14,17,7.0,7,7,2.0,55,\N,,22.0,12.0,1:29.534,213.224,5
8,18,8,6,1,15,8.0,8,8,1.0,53,\N,,20.0,4.0,1:27.903,217.18,5
9,18,9,2,4,2,,R,9,0.0,47,\N,,15.0,9.0,1:28.753,215.1,4
10,18,10,7,12,18,,R,10,0.0,43,\N,,23.0,13.0,1:29.558,213.166,3


In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId","result_id") \
                                    .withColumnRenamed("raceId","race_id") \
                                    .withColumnRenamed("driverId","driver_id") \
                                    .withColumnRenamed("constructorId","constructor_id") \
                                    .withColumnRenamed("positionText","position_text") \
                                    .withColumnRenamed("positionOrder","position_order") \
                                    .withColumnRenamed("fastestLap","fastest_lap") \
                                    .withColumnRenamed("fastestLapTime","fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed","fastest_lap_speed") \
                                    .withColumn("ingestion_date",current_timestamp())


In [0]:
display(results_with_columns_df)

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,statusId,ingestion_date
1,18,1,1,22,1,1.0,1,1,10.0,58,1:34:50.616,5690616.0,39.0,2.0,1:27.452,218.3,1,2023-02-14T07:21:24.721+0000
2,18,2,2,3,5,2.0,2,2,8.0,58,+5.478,5696094.0,41.0,3.0,1:27.739,217.586,1,2023-02-14T07:21:24.721+0000
3,18,3,3,7,7,3.0,3,3,6.0,58,+8.163,5698779.0,41.0,5.0,1:28.090,216.719,1,2023-02-14T07:21:24.721+0000
4,18,4,4,5,11,4.0,4,4,5.0,58,+17.181,5707797.0,58.0,7.0,1:28.603,215.464,1,2023-02-14T07:21:24.721+0000
5,18,5,1,23,3,5.0,5,5,4.0,58,+18.014,5708630.0,43.0,1.0,1:27.418,218.385,1,2023-02-14T07:21:24.721+0000
6,18,6,3,8,13,6.0,6,6,3.0,57,\N,,50.0,14.0,1:29.639,212.974,11,2023-02-14T07:21:24.721+0000
7,18,7,5,14,17,7.0,7,7,2.0,55,\N,,22.0,12.0,1:29.534,213.224,5,2023-02-14T07:21:24.721+0000
8,18,8,6,1,15,8.0,8,8,1.0,53,\N,,20.0,4.0,1:27.903,217.18,5,2023-02-14T07:21:24.721+0000
9,18,9,2,4,2,,R,9,0.0,47,\N,,15.0,9.0,1:28.753,215.1,4,2023-02-14T07:21:24.721+0000
10,18,10,7,12,18,,R,10,0.0,43,\N,,23.0,13.0,1:29.558,213.166,3,2023-02-14T07:21:24.721+0000


In [0]:
results_with_columns_df = results_with_columns_df.drop(col("statusId"))

In [0]:
results_with_columns_df.write.mode("overwrite").partitionBy('race_id').parquet("./results")

In [0]:
pit_stops_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("stop", StringType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("duration", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

In [0]:
pit_stops_df = spark.read \
.schema(pit_stops_schema) \
.option("multiLine", True) \
.json("dbfs:/FileStore/shared_uploads/antony.prince@thoughtworks.com/pit_stops.json")

In [0]:
final_df = pit_stops_df.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id") \
.withColumn("ingestion_date", current_timestamp())

In [0]:
final_df.write.mode("overwrite").parquet("/mnt/formula1dl/processed/pit_stops")