In [1]:
# Import library
import pyspark
from pyspark.sql import SparkSession

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("formula-one").getOrCreate()

# Load Constructor Datasets
df_constructor = spark.read.load("/content/constructors.csv",format="csv", sep=",", inferSchema="true", header="true")
df_constructor_standing = spark.read.load("/content/constructor_standings.csv",format="csv", sep=",", inferSchema="true", header="true")
df_constructor_results = spark.read.load("/content/constructor_standings.csv",format="csv", sep=",", inferSchema="true", header="true")

# Show Constructor Datasets
print("=========================== CONSTRUCTOR DATASET ===========================")
df_constructor.show(10)
print("======================== CONSTRUCTOR STANDING DATASET ========================")
df_constructor_standing.show(10)
print("========================= CONSTRUCTOR RESULT DATASET =========================")
df_constructor_results.show(10)

+-------------+--------------+-----------+-----------+--------------------+
|constructorId|constructorRef|       name|nationality|                 url|
+-------------+--------------+-----------+-----------+--------------------+
|            1|       mclaren|    McLaren|    British|http://en.wikiped...|
|            2|    bmw_sauber| BMW Sauber|     German|http://en.wikiped...|
|            3|      williams|   Williams|    British|http://en.wikiped...|
|            4|       renault|    Renault|     French|http://en.wikiped...|
|            5|    toro_rosso| Toro Rosso|    Italian|http://en.wikiped...|
|            6|       ferrari|    Ferrari|    Italian|http://en.wikiped...|
|            7|        toyota|     Toyota|   Japanese|http://en.wikiped...|
|            8|   super_aguri|Super Aguri|   Japanese|http://en.wikiped...|
|            9|      red_bull|   Red Bull|   Austrian|http://en.wikiped...|
|           10|   force_india|Force India|     Indian|http://en.wikiped...|
+-----------

In [4]:
# Create Constructor Tables
df_constructor.createOrReplaceTempView("dim_constructor")
df_constructor_standing.createOrReplaceTempView("dim_constructor_standing")
df_constructor_results.createOrReplaceTempView("dim_constructor_results")

# Merge Constructors Datasets into Staging Table
df_constructor_staging = spark.sql("""
SELECT
  dc.constructorId AS `Constructor ID`,
  dc.constructorRef AS `Constructor Reference`,
  dc.name AS `Constructor Name`,
  dc.nationality AS Nationaliity,
  TRY_CAST(dcs.points AS INT) AS Points,
  dcs.position AS Position,
  dcs.wins AS Win,
  current_timestamp() AS `ETL Date`
FROM dim_constructor_results dcs
LEFT JOIN dim_constructor dc ON dcs.constructorId = dc.constructorId
ORDER BY dc.constructorId;
""")
# Collect into Staging Table
df_constructor_staging.createOrReplaceTempView("stg_dim_constructor")
# Display the result
df_constructor_staging.show(10)
# Save to CSV (overwrite mode)
df_constructor_staging.write.csv("/content/etl-pipeline/stg_dim_constructor.csv", header=True, mode="overwrite")

+--------------+---------------------+----------------+------------+------+--------+---+--------------------+
|Constructor ID|Constructor Reference|Constructor Name|Nationaliity|Points|Position|Win|            ETL Date|
+--------------+---------------------+----------------+------------+------+--------+---+--------------------+
|             1|              mclaren|         McLaren|     British|   100|       2|  5|2025-10-11 12:28:...|
|             1|              mclaren|         McLaren|     British|    76|      11|  2|2025-10-11 12:28:...|
|             1|              mclaren|         McLaren|     British|   113|       2|  5|2025-10-11 12:28:...|
|             1|              mclaren|         McLaren|     British|    42|       3|  1|2025-10-11 12:28:...|
|             1|              mclaren|         McLaren|     British|   119|       2|  5|2025-10-11 12:28:...|
|             1|              mclaren|         McLaren|     British|    14|       1|  1|2025-10-11 12:28:...|
|         

In [5]:
# Load Constructor Datasets
df_driver = spark.read.load("/content/drivers.csv",format="csv", sep=",", inferSchema="true", header="true")
df_driver_standing = spark.read.load("/content/driver_standings.csv",format="csv", sep=",", inferSchema="true", header="true")

# Show Constructor Datasets
print("========================================= DRIVER DATASET =========================================")
df_driver.show(10)
print("====================== DRIVER STANDING DATASET ======================")
df_driver_standing.show(10)

+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|driverId| driverRef|number|code| forename|   surname|       dob|nationality|                 url|
+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|    Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|    \N| HEI|     Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|     Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|
|       4|    alonso|    14| ALO| Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|
|       5|kovalainen|    \N| KOV|   Heikki|Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|
|       6|  nakajima|    \N| NAK|   Kazuki|  Nakajima|1985-01-11|   Japanese|http://en.wikiped...|
|       7|  bourdais|    \N| BOU|Sébastien|  Bourdais|1979-02-28|     French|http://en.wikiped...|
|       8|

In [6]:
# Create Driver Tables
df_driver.createOrReplaceTempView("dim_driver")
df_driver_standing.createOrReplaceTempView("dim_driver_standing")

# Merge Drivers Datasets into Staging Table
df_driver_staging = spark.sql("""
SELECT
  dd.driverId AS `Driver ID`,
  CONCAT(dd.forename, ' ', dd.surname) AS `Driver Name`,
  dd.number AS `Driver Number`,
  dd.code AS `Driver Code`,
  dd.nationality AS Nationaliity,
  TRY_CAST(dds.points AS INT) AS Points,
  dds.position AS Position,
  dds.wins AS Win,
  current_timestamp() AS `ETL Date`
FROM dim_driver_standing dds
LEFT JOIN dim_driver dd ON dds.driverId = dd.driverId
ORDER BY dd.driverId;
""")
# Collect into Staging Table
df_driver_staging.createOrReplaceTempView("stg_dim_driver")
# Display the result
df_driver_staging.show(10)
# Save to CSV (overwrite mode)
df_driver_staging.write.csv("/content/etl-pipeline/stg_dim_driver.csv", header=True, mode="overwrite")

+---------+--------------+-------------+-----------+------------+------+--------+---+--------------------+
|Driver ID|   Driver Name|Driver Number|Driver Code|Nationaliity|Points|Position|Win|            ETL Date|
+---------+--------------+-------------+-----------+------------+------+--------+---+--------------------+
|        1|Lewis Hamilton|           44|        HAM|     British|    38|       4|  2|2025-10-11 12:30:...|
|        1|Lewis Hamilton|           44|        HAM|     British|    98|       1|  5|2025-10-11 12:30:...|
|        1|Lewis Hamilton|           44|        HAM|     British|    48|       1|  3|2025-10-11 12:30:...|
|        1|Lewis Hamilton|           44|        HAM|     British|    14|       1|  1|2025-10-11 12:30:...|
|        1|Lewis Hamilton|           44|        HAM|     British|    58|       1|  4|2025-10-11 12:30:...|
|        1|Lewis Hamilton|           44|        HAM|     British|    10|       1|  1|2025-10-11 12:30:...|
|        1|Lewis Hamilton|           

In [7]:
# Load Results Datasets
df_results = spark.read.load("/content/results.csv",format="csv", sep=",", inferSchema="true", header="true")
df_sprint_results = spark.read.load("/content/sprint_results.csv",format="csv", sep=",", inferSchema="true", header="true")
df_circuit = spark.read.load("/content/circuits.csv",format="csv", sep=",", inferSchema="true", header="true")
df_laptime = spark.read.load("/content/lap_times.csv",format="csv", sep=",", inferSchema="true", header="true")
df_pitstop = spark.read.load("/content/pit_stops.csv",format="csv", sep=",", inferSchema="true", header="true")
df_qualifying = spark.read.load("/content/qualifying.csv",format="csv", sep=",", inferSchema="true", header="true")
df_races = spark.read.load("/content/races.csv",format="csv", sep=",", inferSchema="true", header="true")
df_seasons = spark.read.load("/content/seasons.csv",format="csv", sep=",", inferSchema="true", header="true")
df_status = spark.read.load("/content/status.csv",format="csv", sep=",", inferSchema="true", header="true")

# Show Results Datasets
print("================================================================================= RESULTS DATASET =================================================================================")
df_results.show(10)
print("================================================================================= SPRINT RESULTS DATASET =====================================================")
df_sprint_results.show(10)
print("================================================ CIRCUIT DATASET ===============================================")
df_circuit.show(10)
print("=============== LAPTIME DATASET ====================")
df_laptime.show(10)
print("======================== PITSTOP DATASET ===========================")
df_pitstop.show(10)
print("=================================== QUALIFYING DATASET ============================")
df_qualifying.show(10)
print("===================================================================================== RACES DATASET ==========================================================================================")
df_races.show(10)
print("===== SEASONS DATASET =====")
df_seasons.show(10)
print("=== STATUS DATASET ===")
df_status.show(10)

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|        218.300|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

In [10]:
# Create Results Tables
df_results.createOrReplaceTempView("dim_results")
df_sprint_results.createOrReplaceTempView("dim_sprint_results")
df_circuit.createOrReplaceTempView("dim_circuit")
df_laptime.createOrReplaceTempView("dim_laptime")
df_pitstop.createOrReplaceTempView("dim_pitstop")
df_qualifying.createOrReplaceTempView("dim_qualifying")
df_races.createOrReplaceTempView("dim_races")
df_seasons.createOrReplaceTempView("dim_seasons")
df_status.createOrReplaceTempView("dim_status")

# Merge Results Datasets into Staging Table
df_results_staging = spark.sql("""
SELECT
  dsr.resultId AS `Result ID`,
  dsr.raceId AS `Race ID`,
  dsr.driverId AS `Driver ID`,
  dsr.constructorId AS `Constructor ID`,
  ds.name AS `Circuit Race`,
  dc.location AS `Circuit Location`,
  dc.country AS `Circuit Country`,
  dc.lat AS Latitude,
  dc.lng AS Longitude,
  dsr.number AS `Driver Number`,
  dsr.grid AS `Grid`,
  dsr.position AS `Position`,
  dsr.positionText AS `Position Text`,
  dsr.positionOrder AS `Position Order`,
  ds.date AS `Date`,
  dsr.points AS `Points`,
  dsr.laps AS `Laps`,
  ds.time AS `Time`,
  dsr.milliseconds AS `Milliseconds`,
  dsr.fastestLap AS `Fastest Lap`,
  dr.rank AS `Rank`,
  dq.q1 AS `Q1`,
  dq.q2 AS `Q2`,
  dq.q3 AS `Q3`,
  dsr.fastestLapTime AS `Fastest Lap Time`,
  dr.fastestLapSpeed AS `Fastest Lap Speed`,
  dsr.statusId AS `Status ID`,
  dt.status AS `Status`,
  current_timestamp() AS `ETL Date`
FROM dim_sprint_results dsr
LEFT JOIN dim_results dr ON dsr.resultId = dr.resultId
LEFT JOIN dim_qualifying dq ON dsr.raceId = dq.raceId
LEFT JOIN dim_races ds ON dsr.raceId = ds.raceId
LEFT JOIN dim_circuit dc ON ds.circuitId = dc.circuitId
LEFT JOIN dim_status dt ON dsr.statusId = dt.statusId
ORDER BY dsr.resultId;
""")
# Collect into Staging Table
df_results_staging.createOrReplaceTempView("stg_dim_result")
# Display the result
df_results_staging.show(10)
# Save to CSV (overwrite mode)
df_results_staging.write.csv("/content/etl-pipeline/stg_dim_result.csv", header=True, mode="overwrite")

+---------+-------+---------+--------------+------------------+----------------+---------------+--------+---------+-------------+----+--------+-------------+--------------+----------+------+----+--------+------------+-----------+----+--------+--------+---+----------------+-----------------+---------+--------+--------------------+
|Result ID|Race ID|Driver ID|Constructor ID|      Circuit Race|Circuit Location|Circuit Country|Latitude|Longitude|Driver Number|Grid|Position|Position Text|Position Order|      Date|Points|Laps|    Time|Milliseconds|Fastest Lap|Rank|      Q1|      Q2| Q3|Fastest Lap Time|Fastest Lap Speed|Status ID|  Status|            ETL Date|
+---------+-------+---------+--------------+------------------+----------------+---------------+--------+---------+-------------+----+--------+-------------+--------------+----------+------+----+--------+------------+-----------+----+--------+--------+---+----------------+-----------------+---------+--------+--------------------+
|   

In [13]:
# Download ETL-Pipeline Folder
from google.colab import files
import shutil

shutil.make_archive("/content/etl-pipeline", 'zip', "/content/etl-pipeline")
files.download("/content/etl-pipeline.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>