In [0]:
%run "../common_functions"

In [0]:
%run "../Paths_config"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, FloatType
from pyspark.sql.functions import count, col, lit, to_timestamp, concat

In [0]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("location", StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),
                                     StructField("url", StringType(), True)
])

In [0]:
circuits_df = spark.read \
    .option("header", True) \
    .schema(circuits_schema) \
    .csv(f'{raw_folder_path}/circuits.csv')

In [0]:
display(circuits_df)

circuitId,circuitRef,name,location,country,lat,lng,alt,url
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,http://en.wikipedia.org/wiki/Sepang_International_Circuit
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,http://en.wikipedia.org/wiki/Bahrain_International_Circuit
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,http://en.wikipedia.org/wiki/Istanbul_Park
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7,http://en.wikipedia.org/wiki/Circuit_de_Monaco
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13,http://en.wikipedia.org/wiki/Circuit_Gilles_Villeneuve
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228,http://en.wikipedia.org/wiki/Circuit_de_Nevers_Magny-Cours
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153,http://en.wikipedia.org/wiki/Silverstone_Circuit
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103,http://en.wikipedia.org/wiki/Hockenheimring


In [0]:
circuits_df1 = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

In [0]:
from pyspark.sql.functions import current_timestamp

circuits_df2 = circuits_df1.withColumnRenamed("circuitId", "circuit_id") \
    .withColumnRenamed("circuitRef", "circuit_ref") \
    .withColumnRenamed("lat", "latitude") \
    .withColumnRenamed("lng", "longitude") \
    .withColumnRenamed("alt", "altitude") \
    .withColumn("ingestion_date", current_timestamp())

In [0]:
#circuits_final_df = add_ingestion_date(circuits_df2)
#created a function in common_function notebook which automatically takes input_df and adds a column for ingestion_date!

In [0]:
spark.sql(f"""
    
CREATE DATABASE IF NOT EXISTS f1_processed
LOCATION '{processed_folder_path}/silver'
""")

DataFrame[]

In [0]:
circuits_df2.write.mode("overwrite").format("delta").saveAsTable("f1_processed.circuits")

In [0]:
races_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                  StructField("year", IntegerType(), True),
                                  StructField("round", IntegerType(), True),
                                  StructField("circuitId", IntegerType(), True),
                                  StructField("name", StringType(), True),
                                  StructField("date", DateType(), True),
                                  StructField("time", StringType(), True),
                                  StructField("url", StringType(), True) 
])

In [0]:
races_df = spark.read \
    .schema(races_schema)\
    .option("header", True)\
    .csv(f"{raw_folder_path}/races.csv")

In [0]:
display(races_df)

raceId,year,round,circuitId,name,date,time,url
1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00,http://en.wikipedia.org/wiki/2009_Australian_Grand_Prix
2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00,http://en.wikipedia.org/wiki/2009_Malaysian_Grand_Prix
3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00,http://en.wikipedia.org/wiki/2009_Chinese_Grand_Prix
4,2009,4,3,Bahrain Grand Prix,2009-04-26,12:00:00,http://en.wikipedia.org/wiki/2009_Bahrain_Grand_Prix
5,2009,5,4,Spanish Grand Prix,2009-05-10,12:00:00,http://en.wikipedia.org/wiki/2009_Spanish_Grand_Prix
6,2009,6,6,Monaco Grand Prix,2009-05-24,12:00:00,http://en.wikipedia.org/wiki/2009_Monaco_Grand_Prix
7,2009,7,5,Turkish Grand Prix,2009-06-07,12:00:00,http://en.wikipedia.org/wiki/2009_Turkish_Grand_Prix
8,2009,8,9,British Grand Prix,2009-06-21,12:00:00,http://en.wikipedia.org/wiki/2009_British_Grand_Prix
9,2009,9,20,German Grand Prix,2009-07-12,12:00:00,http://en.wikipedia.org/wiki/2009_German_Grand_Prix
10,2009,10,11,Hungarian Grand Prix,2009-07-26,12:00:00,http://en.wikipedia.org/wiki/2009_Hungarian_Grand_Prix


In [0]:
races_df1 = races_df.withColumn("race_timestamp", to_timestamp(concat(col('date'), lit(' '), col('time')), 'yyyy-MM-dd HH:mm:ss'))

In [0]:
races_with_ingestion_date_df = add_ingestion_date(races_df1)

In [0]:
final_races_df = races_with_ingestion_date_df.select(col('raceId').alias('race_id'), col('year').alias('race_year'), 
                                                   col('round'), col('circuitId').alias('circuit_id'),col('name'), col('ingestion_date'), col('race_timestamp'))

In [0]:
display(final_races_df)

race_id,race_year,round,circuit_id,name,ingestion_date,race_timestamp
1,2009,1,1,Australian Grand Prix,2025-05-31T22:03:31.637937Z,2009-03-29T06:00:00Z
2,2009,2,2,Malaysian Grand Prix,2025-05-31T22:03:31.637937Z,2009-04-05T09:00:00Z
3,2009,3,17,Chinese Grand Prix,2025-05-31T22:03:31.637937Z,2009-04-19T07:00:00Z
4,2009,4,3,Bahrain Grand Prix,2025-05-31T22:03:31.637937Z,2009-04-26T12:00:00Z
5,2009,5,4,Spanish Grand Prix,2025-05-31T22:03:31.637937Z,2009-05-10T12:00:00Z
6,2009,6,6,Monaco Grand Prix,2025-05-31T22:03:31.637937Z,2009-05-24T12:00:00Z
7,2009,7,5,Turkish Grand Prix,2025-05-31T22:03:31.637937Z,2009-06-07T12:00:00Z
8,2009,8,9,British Grand Prix,2025-05-31T22:03:31.637937Z,2009-06-21T12:00:00Z
9,2009,9,20,German Grand Prix,2025-05-31T22:03:31.637937Z,2009-07-12T12:00:00Z
10,2009,10,11,Hungarian Grand Prix,2025-05-31T22:03:31.637937Z,2009-07-26T12:00:00Z


In [0]:
final_races_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.races")

In [0]:
constructors_schema = StructType(fields=[StructField("constructorId", IntegerType(), False),
                                     StructField("constructorRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("nationality", StringType(), True),
                                     StructField("url", StringType(), True)
])

In [0]:
constructor_df = spark.read \
.schema(constructors_schema) \
.json(f"{raw_folder_path}/constructors.json")

In [0]:
display(constructor_df)

constructorId,constructorRef,name,nationality,url
1,mclaren,McLaren,British,http://en.wikipedia.org/wiki/McLaren
2,bmw_sauber,BMW Sauber,German,http://en.wikipedia.org/wiki/BMW_Sauber
3,williams,Williams,British,http://en.wikipedia.org/wiki/Williams_Grand_Prix_Engineering
4,renault,Renault,French,http://en.wikipedia.org/wiki/Renault_in_Formula_One
5,toro_rosso,Toro Rosso,Italian,http://en.wikipedia.org/wiki/Scuderia_Toro_Rosso
6,ferrari,Ferrari,Italian,http://en.wikipedia.org/wiki/Scuderia_Ferrari
7,toyota,Toyota,Japanese,http://en.wikipedia.org/wiki/Toyota_Racing
8,super_aguri,Super Aguri,Japanese,http://en.wikipedia.org/wiki/Super_Aguri_F1
9,red_bull,Red Bull,Austrian,http://en.wikipedia.org/wiki/Red_Bull_Racing
10,force_india,Force India,Indian,http://en.wikipedia.org/wiki/Racing_Point_Force_India


In [0]:
constructor_df1 = constructor_df.drop(col('url'))

In [0]:
constructor_df2 = constructor_df1.withColumnRenamed("constructorId", "constructor_id") \
                                 .withColumnRenamed("constructorRef", "constructor_ref")

In [0]:
final_constructor_df = add_ingestion_date(constructor_df2)

In [0]:
display(final_constructor_df)

constructor_id,constructor_ref,name,nationality,ingestion_date
1,mclaren,McLaren,British,2025-05-31T22:13:36.476062Z
2,bmw_sauber,BMW Sauber,German,2025-05-31T22:13:36.476062Z
3,williams,Williams,British,2025-05-31T22:13:36.476062Z
4,renault,Renault,French,2025-05-31T22:13:36.476062Z
5,toro_rosso,Toro Rosso,Italian,2025-05-31T22:13:36.476062Z
6,ferrari,Ferrari,Italian,2025-05-31T22:13:36.476062Z
7,toyota,Toyota,Japanese,2025-05-31T22:13:36.476062Z
8,super_aguri,Super Aguri,Japanese,2025-05-31T22:13:36.476062Z
9,red_bull,Red Bull,Austrian,2025-05-31T22:13:36.476062Z
10,force_india,Force India,Indian,2025-05-31T22:13:36.476062Z


In [0]:
final_constructor_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.constructors")

In [0]:
name_schema = StructType(fields=[StructField("forename", StringType(), True),
                                 StructField("surname", StringType(), True)
  
])

In [0]:
drivers_schema = StructType(fields=[StructField("driverId", IntegerType(), False),
                                    StructField("driverRef", StringType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("code", StringType(), True),
                                    StructField("name", name_schema),
                                    StructField("dob", DateType(), True),
                                    StructField("nationality", StringType(), True),
                                    StructField("url", StringType(), True)  
])

In [0]:
drivers_df = spark.read \
.schema(drivers_schema) \
.json(f"{raw_folder_path}/drivers.json")

In [0]:
display(drivers_df)

driverId,driverRef,number,code,name,dob,nationality,url
1,hamilton,44.0,HAM,"List(Lewis, Hamilton)",1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,,HEI,"List(Nick, Heidfeld)",1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6.0,ROS,"List(Nico, Rosberg)",1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14.0,ALO,"List(Fernando, Alonso)",1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,,KOV,"List(Heikki, Kovalainen)",1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,,NAK,"List(Kazuki, Nakajima)",1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,,BOU,"List(Sébastien, Bourdais)",1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7.0,RAI,"List(Kimi, Räikkönen)",1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88.0,KUB,"List(Robert, Kubica)",1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,,GLO,"List(Timo, Glock)",1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


In [0]:
drivers_with_ingestion_date_df = add_ingestion_date(drivers_df)

In [0]:
final_drivers_df = drivers_with_ingestion_date_df.withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("driverRef", "driver_ref") \
                                    .withColumn("name", concat(col("name.forename"), lit(" "), col("name.surname"))) \
                                    .drop(col('url'))

In [0]:
display(final_drivers_df)

driver_id,driver_ref,number,code,name,dob,nationality,ingestion_date
1,hamilton,44.0,HAM,Lewis Hamilton,1985-01-07,British,2025-05-31T22:20:01.717814Z
2,heidfeld,,HEI,Nick Heidfeld,1977-05-10,German,2025-05-31T22:20:01.717814Z
3,rosberg,6.0,ROS,Nico Rosberg,1985-06-27,German,2025-05-31T22:20:01.717814Z
4,alonso,14.0,ALO,Fernando Alonso,1981-07-29,Spanish,2025-05-31T22:20:01.717814Z
5,kovalainen,,KOV,Heikki Kovalainen,1981-10-19,Finnish,2025-05-31T22:20:01.717814Z
6,nakajima,,NAK,Kazuki Nakajima,1985-01-11,Japanese,2025-05-31T22:20:01.717814Z
7,bourdais,,BOU,Sébastien Bourdais,1979-02-28,French,2025-05-31T22:20:01.717814Z
8,raikkonen,7.0,RAI,Kimi Räikkönen,1979-10-17,Finnish,2025-05-31T22:20:01.717814Z
9,kubica,88.0,KUB,Robert Kubica,1984-12-07,Polish,2025-05-31T22:20:01.717814Z
10,glock,,GLO,Timo Glock,1982-03-18,German,2025-05-31T22:20:01.717814Z


In [0]:
final_drivers_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.drivers")

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [0]:
results_df = spark.read \
.schema(results_schema) \
.json(f"{raw_folder_path}/results.json")

In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
                                    .drop(col("statusId"))

In [0]:
final_results_df = add_ingestion_date(results_with_columns_df)

In [0]:
results_deduped_df = final_results_df.dropDuplicates(['race_id', 'driver_id'])

In [0]:
merge_condition = "tgt.result_id = src.result_id AND tgt.race_id = src.race_id"
merge_delta_data(results_deduped_df, 'f1_processed', 'results', processed_folder_path, merge_condition, 'race_id')

In [0]:
%sql
SELECT COUNT(*) FROM f1_processed.results;

count(1)
24869


In [0]:
pit_stops_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("stop", StringType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("duration", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

In [0]:
pit_stops_df = spark.read \
.schema(pit_stops_schema) \
.option("multiLine", True) \
.json(f"{raw_folder_path}/pit_stops.json")

In [0]:
pit_stops_with_ingestion_date_df = add_ingestion_date(pit_stops_df)

In [0]:
final_df = pit_stops_with_ingestion_date_df.withColumnRenamed("driverId", "driver_id") \
                                           .withColumnRenamed("raceId", "race_id")

In [0]:
merge_condition = "tgt.race_id = src.race_id AND tgt.driver_id = src.driver_id AND tgt.stop = src.stop AND tgt.race_id = src.race_id"
merge_delta_data(final_df, 'f1_processed', 'pit_stops', processed_folder_path, merge_condition, 'race_id')

In [0]:
%sql
SELECT COUNT(*) FROM f1_processed.pit_stops

count(1)
8030


In [0]:
lap_times_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("position", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

In [0]:
lap_times_df = spark.read \
.schema(lap_times_schema) \
.csv(f"{raw_folder_path}/lap_times")

In [0]:
lap_times_with_ingestion_date_df = add_ingestion_date(lap_times_df)

In [0]:
final_df = lap_times_with_ingestion_date_df.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id")

In [0]:
merge_condition = "tgt.race_id = src.race_id AND tgt.driver_id = src.driver_id AND tgt.lap = src.lap AND tgt.race_id = src.race_id"
merge_delta_data(final_df, 'f1_processed', 'lap_times', processed_folder_path, merge_condition, 'race_id')

In [0]:
%sql
SELECT COUNT(*) FROM f1_processed.lap_times

count(1)
490904


In [0]:
qualifying_schema = StructType(fields=[StructField("qualifyId", IntegerType(), False),
                                      StructField("raceId", IntegerType(), True),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("constructorId", IntegerType(), True),
                                      StructField("number", IntegerType(), True),
                                      StructField("position", IntegerType(), True),
                                      StructField("q1", StringType(), True),
                                      StructField("q2", StringType(), True),
                                      StructField("q3", StringType(), True),
                                     ])

In [0]:
qualifying_df = spark.read \
.schema(qualifying_schema) \
.option("multiLine", True) \
.json(f"{raw_folder_path}/qualifying")

In [0]:
qualifying_with_ingestion_date_df = add_ingestion_date(qualifying_df)

In [0]:
final_df = qualifying_with_ingestion_date_df.withColumnRenamed("qualifyId", "qualify_id") \
.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id") \
.withColumnRenamed("constructorId", "constructor_id")

In [0]:
merge_condition = "tgt.qualify_id = src.qualify_id AND tgt.race_id = src.race_id"
merge_delta_data(final_df, 'f1_processed', 'qualifying', processed_folder_path, merge_condition, 'race_id')

In [0]:
%sql
SELECT COUNT(*) FROM f1_processed.qualifying

count(1)
8694
