# ETL using Databricks & Azure Data Lake

## Extraction

In [0]:
# %run "configuration"
# dl_spec = datalake_spec
# dl_key = datalake_key



In [0]:
spark.conf.set(
    dl_spec,
    dl_key
)

In [0]:
circuits_df = spark.read.option("header",True).csv("abfss://container@arianazuredl1.dfs.core.windows.net/circuits.csv")
races_df = spark.read.option("header",True).csv("abfss://container@arianazuredl1.dfs.core.windows.net/races.csv")


In [0]:
circuits_df.show()
# display(circuits_df)

+---------+--------------+--------------------+------------+---------+--------+---------+---+--------------------+
|circuitId|    circuitRef|                name|    location|  country|     lat|      lng|alt|                 url|
+---------+--------------+--------------------+------------+---------+--------+---------+---+--------------------+
|        1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968| 10|http://en.wikiped...|
|        2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738| 18|http://en.wikiped...|
|        3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|  7|http://en.wikiped...|
|        4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|109|http://en.wikiped...|
|        5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|130|http://en.wikiped...|
|        6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347| 

In [0]:
races_df.show()

+------+----+-----+---------+--------------------+----------+--------+--------------------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00|http://en.wikiped...|
|     6|2009|    6|        6|   Monaco Grand Prix|2009-05-24|12:00:00|http://en.wikiped...|
|     7|2009|    7|        5|  Turkish Grand Prix|2009-06-07|12:00:00|http://en.wikiped...|
|     8|2009|    8|        9|  British Grand Prix|2009-06-21|12:00:00|http://en.

In [0]:
circuits_df.printSchema()
#circuits_df.describe().show()

root
 |-- circuitId: string (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- url: string (nullable = true)



In [0]:
races_df.printSchema()

root
 |-- raceId: string (nullable = true)
 |-- year: string (nullable = true)
 |-- round: string (nullable = true)
 |-- circuitId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- url: string (nullable = true)



## Transform

#### Pick up the required columns

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId").alias("circuit_id"), col("circuitRef").alias("circuit_ref"), col("name"),
                                          col("location"), col("country"), col("lat").alias("latitude"), col("lng").alias("longitude"), col("alt").alias("altitude"))

In [0]:
circuits_selected_df.show()

+----------+--------------+--------------------+------------+---------+--------+---------+--------+
|circuit_id|   circuit_ref|                name|    location|  country|latitude|longitude|altitude|
+----------+--------------+--------------------+------------+---------+--------+---------+--------+
|         1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|
|         2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|      18|
|         3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|       7|
|         4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|     109|
|         5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|     130|
|         6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347|  7.42056|       7|
|         7|    villeneuve|Circuit Gilles Vi...|    Montreal|   Canada|    45.5| -73.5228|      13|


#### Adding ingestion date as a column 

In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
circuits_added_df = circuits_selected_df.withColumn("ingestion_date",current_timestamp())

In [0]:
from pyspark.sql.functions import current_timestamp, lit, to_timestamp, concat, col

In [0]:
races_with_timestamp_df = races_df.withColumn("ingestion_date",current_timestamp()) \
                                .withColumn("race_timestamp", to_timestamp(concat(col('date'), lit(' '), col('time')), 'yyyy-MM-dd HH:mm:ss'))


In [0]:
races_with_timestamp_df.show()

+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|      ingestion_date|     race_timestamp|
+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-03-29 06:00:00|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-04-05 09:00:00|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-04-19 07:00:00|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-04-26 12:00:00|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00

#### Replace null values

In [0]:
cleaned_circuits_df = circuits_added_df.na.fill({"country":"unkown", 
                                                 "location":"unkown"})

#### Drop Duplicated values

In [0]:
drop_dup_circuits_df = cleaned_circuits_df.dropDuplicates()
drop_dup_circuits_df.show()

+----------+--------------+--------------------+------------+---------+--------+---------+--------+--------------------+
|circuit_id|   circuit_ref|                name|    location|  country|latitude|longitude|altitude|      ingestion_date|
+----------+--------------+--------------------+------------+---------+--------+---------+--------+--------------------+
|        14|         monza|Autodromo Naziona...|       Monza|    Italy| 45.6156|  9.28111|     162|2023-09-14 18:53:...|
|        21|         imola|Autodromo Enzo e ...|       Imola|    Italy| 44.3439|  11.7167|      37|2023-09-14 18:53:...|
|         1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|2023-09-14 18:53:...|
|        11|   hungaroring|         Hungaroring|    Budapest|  Hungary| 47.5789|  19.2486|     264|2023-09-14 18:53:...|
|        16|          fuji|       Fuji Speedway|       Oyama|    Japan| 35.3717|  138.927|     583|2023-09-14 18:53:...|
|         2|        sepang|Sepan

#### Filter out records before 1960 and after 2022

In [0]:
filtered_race_df =races_with_timestamp_df.filter((races_with_timestamp_df['year']>=1960) & (races_with_timestamp_df['year']<=2022))
filtered_race_df.show()

+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|      ingestion_date|     race_timestamp|
+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-03-29 06:00:00|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-04-05 09:00:00|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-04-19 07:00:00|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|2023-09-14 18:53:...|2009-04-26 12:00:00|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00

In [0]:
join_df = drop_dup_circuits_df.join(filtered_race_df, drop_dup_circuits_df.circuit_id == filtered_race_df.circuitId,"Inner")
select_join_df = join_df.select(drop_dup_circuits_df.circuit_id, drop_dup_circuits_df.circuit_ref, drop_dup_circuits_df.name, drop_dup_circuits_df.location, drop_dup_circuits_df.country,
               filtered_race_df.race_timestamp, filtered_race_df.raceId, filtered_race_df.year, drop_dup_circuits_df.altitude )
select_join_df.show()

+----------+-----------+--------------------+------------+---------+-------------------+------+----+--------+
|circuit_id|circuit_ref|                name|    location|  country|     race_timestamp|raceId|year|altitude|
+----------+-----------+--------------------+------------+---------+-------------------+------+----+--------+
|         1|albert_park|Albert Park Grand...|   Melbourne|Australia|2009-03-29 06:00:00|     1|2009|      10|
|         2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia|2009-04-05 09:00:00|     2|2009|      18|
|        17|   shanghai|Shanghai Internat...|    Shanghai|    China|2009-04-19 07:00:00|     3|2009|       5|
|         3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain|2009-04-26 12:00:00|     4|2009|       7|
|         4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|2009-05-10 12:00:00|     5|2009|     109|
|         6|     monaco|   Circuit de Monaco| Monte-Carlo|   Monaco|2009-05-24 12:00:00|     6|2009|       7|
|         

#### Number of unique race names

In [0]:
from pyspark.sql.functions import countDistinct

In [0]:
select_join_df.select(countDistinct("name")).show()

+--------------------+
|count(DISTINCT name)|
+--------------------+
|                  68|
+--------------------+



#### Apply GroupBy 

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

select_join_df = select_join_df.withColumn("altitude", col("altitude").cast(IntegerType()))

In [0]:
result_df = select_join_df.groupBy("name").sum("altitude").withColumnRenamed("sum(altitude)", "sum_altitude")
result_df.show()

+--------------------+------------+
|                name|sum_altitude|
+--------------------+------------+
|           Fair Park|         139|
|Scandinavian Raceway|         918|
|       Istanbul Park|        1040|
|Albert Park Grand...|         250|
|Circuito da Boavista|          28|
|Circuit Gilles Vi...|         533|
|Adelaide Street C...|         638|
|Korean Internatio...|           0|
|      Suzuka Circuit|        1440|
|Buddh Internation...|         582|
|Autodromo Naziona...|        9882|
|   Baku City Circuit|         -35|
| Silverstone Circuit|        7497|
|             A1-Ring|       16950|
|  Yas Marina Circuit|          39|
|   Rouen-Les-Essarts|         243|
|Circuit de Barcel...|        3379|
|Circuit de Spa-Fr...|       18446|
|Autódromo José Ca...|       29830|
|Okayama Internati...|         532|
+--------------------+------------+
only showing top 20 rows



#### Add the results to the existing dataframe

In [0]:
results_df = select_join_df.join(result_df, on="name", how="left")
results_df.show()

+--------------------+----------+-----------+------------+---------+-------------------+------+----+--------+------------+
|                name|circuit_id|circuit_ref|    location|  country|     race_timestamp|raceId|year|altitude|sum_altitude|
+--------------------+----------+-----------+------------+---------+-------------------+------+----+--------+------------+
|Albert Park Grand...|         1|albert_park|   Melbourne|Australia|2009-03-29 06:00:00|     1|2009|      10|         250|
|Sepang Internatio...|         2|     sepang|Kuala Lumpur| Malaysia|2009-04-05 09:00:00|     2|2009|      18|         342|
|Shanghai Internat...|        17|   shanghai|    Shanghai|    China|2009-04-19 07:00:00|     3|2009|       5|          80|
|Bahrain Internati...|         3|    bahrain|      Sakhir|  Bahrain|2009-04-26 12:00:00|     4|2009|       7|         126|
|Circuit de Barcel...|         4|  catalunya|    Montmeló|    Spain|2009-05-10 12:00:00|     5|2009|     109|        3379|
|   Circuit de M

#### Rank the rows based on year of each name using Window Method  

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

driverRankSpec = Window.partitionBy("location").orderBy(desc("year"))
window_results_df = results_df.withColumn("rank",rank().over(driverRankSpec))
window_results_df.show()

+--------------------+----------+-----------+---------+---------+-------------------+------+----+--------+------------+----+
|                name|circuit_id|circuit_ref| location|  country|     race_timestamp|raceId|year|altitude|sum_altitude|rank|
+--------------------+----------+-----------+---------+---------+-------------------+------+----+--------+------------+----+
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2021-12-12 13:00:00|  1073|2021|       3|          39|   1|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2020-12-13 13:10:00|  1047|2020|       3|          39|   2|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2019-12-01 13:10:00|  1030|2019|       3|          39|   3|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2018-11-25 13:10:00|  1009|2018|       3|          39|   4|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2017-11-26 13:00:00|   988|2017|       3|          39|   5|


In [0]:
# Register the window_results_df as a temporary SQL table
window_results_df.createOrReplaceTempView("ranked_table")

##### Limit ranks to 10 using SQL

In [0]:
sql_query = """
    SELECT *
    FROM ranked_table
    WHERE rank<11
"""

grouped_df = spark.sql(sql_query)
grouped_df.show()

+--------------------+----------+-----------+---------+---------+-------------------+------+----+--------+------------+----+
|                name|circuit_id|circuit_ref| location|  country|     race_timestamp|raceId|year|altitude|sum_altitude|rank|
+--------------------+----------+-----------+---------+---------+-------------------+------+----+--------+------------+----+
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2021-12-12 13:00:00|  1073|2021|       3|          39|   1|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2020-12-13 13:10:00|  1047|2020|       3|          39|   2|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2019-12-01 13:10:00|  1030|2019|       3|          39|   3|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2018-11-25 13:10:00|  1009|2018|       3|          39|   4|
|  Yas Marina Circuit|        24| yas_marina|Abu Dhabi|      UAE|2017-11-26 13:00:00|   988|2017|       3|          39|   5|


## Load

##### Step4: Write data to data lake file system as a parquet file

In [0]:
drop_dup_circuits_df.write.mode("overwrite").parquet("abfss://container@arianazuredl1.dfs.core.windows.net/processed/circuits")

In [0]:
df = spark.read.parquet("abfss://container@arianazuredl1.dfs.core.windows.net/processed/circuits")
df.show()

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,2023-09-14T14:47:12.673+0000
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,2023-09-14T14:47:12.673+0000
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,2023-09-14T14:47:12.673+0000
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,2023-09-14T14:47:12.673+0000
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,2023-09-14T14:47:12.673+0000
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7,2023-09-14T14:47:12.673+0000
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13,2023-09-14T14:47:12.673+0000
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228,2023-09-14T14:47:12.673+0000
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153,2023-09-14T14:47:12.673+0000
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103,2023-09-14T14:47:12.673+0000
