## Formula 1 project - final assignment 
The goal of this project is to present detailed data regarding F1 races in 1950 - 2021.

#### 1. Setting up the connection between the Azure Data lake storage gen2 and the Databricks

###### a) Defining the key vales which are crucial to properly connect the Databricks to ADLS

In [0]:
client_id = "xxx"
tenant_id = "xxx"
client_secret = "xxx"

###### b) Setting up the connection between Databricks and ADLS

In [0]:
spark.conf.set("fs.azure.account.auth.type.formula1wa.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.formula1wa.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.formula1wa.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.formula1wa.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.formula1wa.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

###### c) The list of the containers available in the ADLS

In [0]:
%fs
ls abfss://raw@formula1wa.dfs.core.windows.net

path,name,size,modificationTime
abfss://raw@formula1wa.dfs.core.windows.net/circuits.csv,circuits.csv,10044,1687008648000
abfss://raw@formula1wa.dfs.core.windows.net/constructors.json,constructors.json,30415,1687008648000
abfss://raw@formula1wa.dfs.core.windows.net/drivers.json,drivers.json,180812,1687008649000
abfss://raw@formula1wa.dfs.core.windows.net/lap_times/,lap_times/,0,1687008965000
abfss://raw@formula1wa.dfs.core.windows.net/pit_stops.json,pit_stops.json,1369387,1687008659000
abfss://raw@formula1wa.dfs.core.windows.net/qualifying/,qualifying/,0,1687008920000
abfss://raw@formula1wa.dfs.core.windows.net/races.csv,races.csv,116847,1687008648000
abfss://raw@formula1wa.dfs.core.windows.net/results.json,results.json,7165641,1687008704000


#### 2. Data transformation and ingestion

#### Circuits

###### a) This part of code provides classes for defining the schema of a DataFrame

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

###### b) Defining a new schema

In [0]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("location", StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),

])

###### c) Ingest CSV file into a PySpark DataFrame using the spark.read.csv() method

In [0]:
circuits_df = spark.read \
.option("header", True) \
.schema(circuits_schema) \
.csv("abfss://demo@formula1wa.dfs.core.windows.net/circuits.csv")

###### d) Selecting specific columns from the circuits_df using col() function

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

###### e) Renaming the columns

In [0]:
circuits_final_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("name", "Circuit_name") \
.withColumnRenamed("circuitRef", "circuit_ref") \
.withColumnRenamed("lat", "latitude") \
.withColumnRenamed("lng", "longitude") \
.withColumnRenamed("alt", "altitude") 
display(circuits_final_df)

circuit_id,circuit_ref,Circuit_name,location,country,latitude,longitude,altitude
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103


###### e) Saving the output in the parquet format

In [0]:
circuits_final_df.write.mode("overwrite").parquet("abfss://processed@formula1wa.dfs.core.windows.net/circuits")

#### Races

###### a) This part of code provides classes for defining the schema of a DataFrame

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

###### b) Defining a new schema

In [0]:
races_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                  StructField("year", IntegerType(), True),
                                  StructField("round", IntegerType(), True),
                                  StructField("circuitId", IntegerType(), True),
                                  StructField("name", StringType(), True),
                                  StructField("date", DateType(), True),
                                  StructField("time", StringType(), True),
                                  StructField("url", StringType(), True)
])

###### c) Ingest CSV file into a PySpark DataFrame using the spark.read.csv() method

In [0]:
races_df = spark.read \
.option("header", True) \
.schema(races_schema) \
.csv("abfss://raw@formula1wa.dfs.core.windows.net/races.csv")

###### d) Selecting specific columns from the races_df using col() function

In [0]:
from pyspark.sql.functions import col

In [0]:
races_final_df = races_df.select(col('raceId').alias('race_id'), col('year').alias('race_year'), col('round'), col('circuitId').alias('circuit_id'),col('name'), col('date'), col('time'))
display(races_final_df)

###### e) Saving the output in the parquet format

In [0]:
races_final_df.write.mode('overwrite').parquet('abfss://processed@formula1wa.dfs.core.windows.net/races')


### Constructors

###### a) Defining a new schema

In [0]:
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

###### b) Ingest JSON file into a PySpark DataFrame using the spark.read.json() method

In [0]:
constructor_df = spark.read \
.schema(constructors_schema) \
.json("abfss://raw@formula1wa.dfs.core.windows.net/constructors.json")

###### c) Renaming the columns

In [0]:
constructor_dropped_df = constructor_df.drop(col('url'))

###### d) Selecting specific columns from the constructors_df using col() function

In [0]:
from pyspark.sql.functions import col

In [0]:
constructor_final_df = constructor_dropped_df.withColumnRenamed("constructorId", "constructor_id") \
                                             .withColumnRenamed("constructorRef", "constructor_ref")
display(constructor_final_df)

constructor_id,constructor_ref,name,nationality
1,mclaren,McLaren,British
2,bmw_sauber,BMW Sauber,German
3,williams,Williams,British
4,renault,Renault,French
5,toro_rosso,Toro Rosso,Italian
6,ferrari,Ferrari,Italian
7,toyota,Toyota,Japanese
8,super_aguri,Super Aguri,Japanese
9,red_bull,Red Bull,Austrian
10,force_india,Force India,Indian


###### e) Saving the output in the parquet format

In [0]:
constructor_final_df.write.mode("overwrite").parquet("abfss://processed@formula1wa.dfs.core.windows.net/constructors")

### Drivers

md ###### a) Defining a new schema

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

###### b) This part of code provides classes for defining the schema of a DataFrame

In [0]:
name_schema = StructType(fields=[StructField("forename", StringType(), True),
                                 StructField("surname", StringType(), True)
])

In [0]:
drivers_schema = StructType(fields=[StructField("driverId", IntegerType(), False),
                                    StructField("driverRef", StringType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("code", StringType(), True),
                                    StructField("name", name_schema),
                                    StructField("dob", DateType(), True),
                                    StructField("nationality", StringType(), True),
                                    StructField("url", StringType(), True)  
])

###### c) Ingest JSON file into a PySpark DataFrame using the spark.read.json() method

In [0]:
drivers_df = spark.read \
.schema(drivers_schema) \
.json("abfss://raw@formula1wa.dfs.core.windows.net/drivers.json")

###### d) Renaming the columns

In [0]:
from pyspark.sql.functions import col, concat, lit

In [0]:
drivers_with_columns_df = drivers_df.withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("driverRef", "driver_ref") \
                                    .withColumn("name", concat(col("name.forename"), lit(" "), col("name.surname")))

###### e) Selecting specific columns from the drivers_with_columns_df using col() function

In [0]:
drivers_final_df = drivers_with_columns_df.drop(col("url"))
display(drivers_final_df)

###### e) Saving the output in the parquet format

In [0]:
drivers_final_df.write.mode("overwrite").parquet("abfss://processed@formula1wa.dfs.core.windows.net/drivers")

### Results

###### a) This part of code provides classes for defining the schema of a DataFrame

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

###### b) Defining a new schema

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

###### c) Ingest JSON file into a PySpark DataFrame using the spark.read.json() method

In [0]:
results_df = spark.read \
.schema(results_schema) \
.json("abfss://raw@formula1wa.dfs.core.windows.net/results.json")

###### d) Renaming the columns

In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")

###### e) Selecting specific columns from the results_df using col() function

In [0]:
from pyspark.sql.functions import col

In [0]:
results_final_df = results_with_columns_df.drop(col("statusId"))
display(results_final_df)

###### e) Saving the output in the parquet format and divide it by race_id

In [0]:
results_final_df.write.mode("overwrite").parquet("abfss://processed@formula1wa.dfs.core.windows.net/results")

### Pit-stop

###### a) This part of code provides classes for defining the schema of a DataFrame

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

###### b) Defining a new schema

In [0]:
pit_stops_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("stop", StringType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("duration", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

###### c) Ingest JSON file into a PySpark DataFrame using the spark.read.json() method

In [0]:
pit_stops_df = spark.read \
.schema(pit_stops_schema) \
.option("multiLine", True) \
.json("abfss://raw@formula1wa.dfs.core.windows.net/pit_stops.json")

###### d) Selecting specific columns from the results_df using col() function

In [0]:
pit_stops_final_df = pit_stops_df.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id")
display(pit_stops_final_df)

###### e) Saving the output in the parquet format

In [0]:
pit_stops_final_df.write.mode("overwrite").parquet("abfss://processed@formula1wa.dfs.core.windows.net/pit_stops")

## Joining the data
Joining the data in order to display the driver standings in particular years with detailed information about the race.

##### Join circuits to races

In [0]:
race_circuits_df = races_df.join(circuits_df, races_df.circuit_id == circuits_df.circuit_id, "inner") \
.select(races_df.race_id, races_df.race_year, races_df.race_name, circuits_df.circuit_location)

##### Join results to all other dataframes

In [0]:
race_results_df = results_df.join(race_circuits_df, results_df.race_id == race_circuits_df.race_id) \
                            .join(drivers_df, results_df.driver_id == drivers_df.driver_id) \
                            .join(constructors_df, results_df.constructor_id == constructors_df.constructor_id)

In [0]:
final_df = race_results_df.select("race_year", "race_name", "circuit_location", "driver_name", "driver_number", "driver_nationality",
                                 "team", "grid", "fastest_lap", "race_time", "points", "position")

In [0]:
display(final_df.filter("race_year == 2020 and race_name == 'Abu Dhabi Grand Prix'").orderBy(final_df.points.desc()))

race_year,race_name,circuit_location,driver_name,driver_number,driver_nationality,team,grid,fastest_lap,race_time,points,position
2020,Abu Dhabi Grand Prix,Abu Dhabi,Max Verstappen,33,Dutch,Red Bull,1,14,1:36:28.645,25.0,1.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Valtteri Bottas,77,Finnish,Mercedes,2,40,+15.976,18.0,2.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Lewis Hamilton,44,British,Mercedes,3,37,+18.415,15.0,3.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Alexander Albon,23,Thai,Red Bull,5,42,+19.987,12.0,4.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Lando Norris,4,British,McLaren,4,53,+1:00.729,10.0,5.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Carlos Sainz,55,Spanish,McLaren,6,48,+1:05.662,8.0,6.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Daniel Ricciardo,3,Australian,Renault,11,55,+1:13.748,7.0,7.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Pierre Gasly,10,French,AlphaTauri,9,53,+1:29.718,4.0,8.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Esteban Ocon,31,French,Renault,10,47,+1:41.069,2.0,9.0
2020,Abu Dhabi Grand Prix,Abu Dhabi,Lance Stroll,18,Canadian,Racing Point,8,41,+1:42.738,1.0,10.0


In [0]:
final_df.write.mode("overwrite").parquet("abfss://presentation@formula1wa.dfs.core.windows.net//race_results")

## Driver standings
The table presents detailed information about the driver standings in particular year

In [0]:
race_results_df = spark.read.parquet("abfss://presentation@formula1wa.dfs.core.windows.net//race_results")

In [0]:
from pyspark.sql.functions import sum, when, count, col

driver_standings_df = race_results_df \
.groupBy("race_year", "driver_name", "driver_nationality", "team") \
.agg(sum("points").alias("total_points"),
     count(when(col("position") == 1, True)).alias("wins"))

In [0]:
display(driver_standings_df.filter("race_year = 2020"))

race_year,driver_name,driver_nationality,team,total_points,wins
2020,Lance Stroll,Canadian,Racing Point,75.0,0
2020,Kevin Magnussen,Danish,Haas F1 Team,1.0,0
2020,Antonio Giovinazzi,Italian,Alfa Romeo,4.0,0
2020,Carlos Sainz,Spanish,McLaren,105.0,0
2020,Nicholas Latifi,Canadian,Williams,0.0,0
2020,Lewis Hamilton,British,Mercedes,347.0,11
2020,Pierre Gasly,French,AlphaTauri,75.0,1
2020,Jack Aitken,British,Williams,0.0,0
2020,Romain Grosjean,French,Haas F1 Team,2.0,0
2020,Daniil Kvyat,Russian,AlphaTauri,32.0,0


Databricks visualization. Run in Databricks to view.