## **1. ingest_circuits_file**

In [13]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "2").appName("Analysis").master("local[2]").getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [None]:
circuits_schema = StructType(fields=[
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True),
])

In [85]:
#read file and apply schema

circuits_df = spark.read \
.option("header", True) \
.schema(circuits_schema) \
.csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/circuits.csv")
circuits_df.printSchema()
circuits_df.show(5)

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)

+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|NULL|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|NULL|http://en.

In [86]:
circuits_df.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|NULL|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|NULL|http://en.wikiped...|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|NULL|http://en.wikiped...|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|NULL|http://en.wikiped...|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
o

In [87]:
circuits_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)



In [88]:
circuits_df.describe().show()

+-------+-----------------+----------+-------+---------+---------+------------------+------------------+----+--------------------+
|summary|        circuitId|circuitRef|   name| location|  country|               lat|               lng| alt|                 url|
+-------+-----------------+----------+-------+---------+---------+------------------+------------------+----+--------------------+
|  count|               74|        74|     74|       74|       74|                74|                74|   1|                  74|
|   mean|             37.5|      NULL|   NULL|     NULL|     NULL|33.698638243243224|3.1288148648648644|10.0|                NULL|
| stddev|21.50581316760657|      NULL|   NULL|     NULL|     NULL| 23.27327352478035| 66.04182770715761|NULL|                NULL|
|    min|                1|       BAK|A1-Ring|Abu Dhabi|Argentina|          -37.8497|          -118.189|  10|http://en.wikiped...|
|    max|               74|    zolder| Zolder|Zandvoort|  Vietnam|           57.265

In [89]:
## Remove unwanted columns

In [90]:
circuits_df_selected = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

In [91]:
circuits_df_selected.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+----+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|
+---------+-----------+--------------------+------------+---------+--------+-------+----+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|NULL|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|NULL|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|NULL|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|NULL|
+---------+-----------+--------------------+------------+---------+--------+-------+----+
only showing top 5 rows



## Rename the columns as required

In [92]:
circuits_renamed_df = circuits_df_selected.withColumnRenamed("circuitId", "circuit_id")\
.withColumnRenamed("circuitRef", "circuit_ref").withColumnRenamed("lat", "latitude")\
.withColumnRenamed("lng", "longitude").withColumnRenamed("alt", "altitude")

In [93]:
circuits_renamed_df.show(5)

+----------+-----------+--------------------+------------+---------+--------+---------+--------+
|circuit_id|circuit_ref|                name|    location|  country|latitude|longitude|altitude|
+----------+-----------+--------------------+------------+---------+--------+---------+--------+
|         1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|
|         2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|    NULL|
|         3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|    NULL|
|         4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|    NULL|
|         5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|    NULL|
+----------+-----------+--------------------+------------+---------+--------+---------+--------+
only showing top 5 rows



## Write the output to processed container in parquet container


In [94]:
# prompt: Write the output to processed container in parquet container


circuits_renamed_df.write.mode("overwrite").parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/circuits")

## **2.ingest_races_file**

##  Define Schema

In [71]:
races_schema = StructType(fields=[
    StructField("raceId", IntegerType(), False),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuitId", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True),
])

## Read the CSV file from drive & apply schema

In [95]:
races_df = spark.read \
.option("header", True) \
.schema(races_schema) \
.csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/races.csv")
circuits_df.printSchema()
circuits_df.show(5)

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)

+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|NULL|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|NULL|http://en.

In [96]:
races_df.show(5)

+------+----+-----+---------+--------------------+----------+--------+--------------------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00|http://en.wikiped...|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
only showing top 5 rows



In [97]:
races_selected_df = races_df.select(col("raceId").alias('race_id'), col("year").alias('race_year'), col("round"), col('circuitId').alias('circuit_id'), col("name"))

In [98]:
races_df.show(5)

+------+----+-----+---------+--------------------+----------+--------+--------------------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00|http://en.wikiped...|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
only showing top 5 rows



In [99]:
races_selected_df.show(5)

+-------+---------+-----+----------+--------------------+
|race_id|race_year|round|circuit_id|                name|
+-------+---------+-----+----------+--------------------+
|      1|     2009|    1|         1|Australian Grand ...|
|      2|     2009|    2|         2|Malaysian Grand Prix|
|      3|     2009|    3|        17|  Chinese Grand Prix|
|      4|     2009|    4|         3|  Bahrain Grand Prix|
|      5|     2009|    5|         4|  Spanish Grand Prix|
+-------+---------+-----+----------+--------------------+
only showing top 5 rows



In [100]:
races_selected_df.write.mode("overwrite").parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/races")

## **3.ingest_constructors_file**

## Define schema

In [101]:
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"


## Read the csv file from drive & apply schema

In [102]:
constructor_df = spark.read.schema(constructors_schema).csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/constructors.csv")


In [103]:
constructor_df.show(5)


+-------------+--------------+----------+-----------+--------------------+
|constructorId|constructorRef|      name|nationality|                 url|
+-------------+--------------+----------+-----------+--------------------+
|         NULL|constructorRef|      name|nationality|                 url|
|            1|       mclaren|   McLaren|    British|http://en.wikiped...|
|            2|    bmw_sauber|BMW Sauber|     German|http://en.wikiped...|
|            3|      williams|  Williams|    British|http://en.wikiped...|
|            4|       renault|   Renault|     French|http://en.wikiped...|
+-------------+--------------+----------+-----------+--------------------+
only showing top 5 rows



## Drop unwanted columns from the dataframe

In [104]:
constructor_dropped_df = constructor_df.drop(col('url'))


## Rename column

In [105]:
constructor_final_df = constructor_dropped_df.withColumnRenamed("constructorId", "constructor_id")\
.withColumnRenamed("constructorRef", "constructor_ref")

In [106]:
constructor_final_df.show(5)

+--------------+---------------+----------+-----------+
|constructor_id|constructor_ref|      name|nationality|
+--------------+---------------+----------+-----------+
|          NULL| constructorRef|      name|nationality|
|             1|        mclaren|   McLaren|    British|
|             2|     bmw_sauber|BMW Sauber|     German|
|             3|       williams|  Williams|    British|
|             4|        renault|   Renault|     French|
+--------------+---------------+----------+-----------+
only showing top 5 rows



## Write outut to parquet file


In [107]:
constructor_final_df.write.mode("overwrite").parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/constructors")

## **4.ingest_drivers_file**

## Read the CSV file from drive & apply schema

In [108]:
drivers_df = spark.read.csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/drivers.csv",header=True)


In [109]:
drivers_df.show(5)


+--------+----------+------+----+--------+----------+----------+-----------+--------------------+
|driverId| driverRef|number|code|forename|   surname|       dob|nationality|                 url|
+--------+----------+------+----+--------+----------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|   Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|    \N| HEI|    Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|    Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|
|       4|    alonso|    14| ALO|Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|
|       5|kovalainen|    \N| KOV|  Heikki|Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|
+--------+----------+------+----+--------+----------+----------+-----------+--------------------+
only showing top 5 rows



## Rename columns & concatenate forename & surname


In [110]:
drivers_with_columns_df = drivers_df.withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("driverRef", "driver_ref")\
.withColumn("name", concat(col("forename"), lit(" "), col("surname")))

In [111]:
drivers_with_columns_df.show(5)


+---------+----------+------+----+--------+----------+----------+-----------+--------------------+-----------------+
|driver_id|driver_ref|number|code|forename|   surname|       dob|nationality|                 url|             name|
+---------+----------+------+----+--------+----------+----------+-----------+--------------------+-----------------+
|        1|  hamilton|    44| HAM|   Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|   Lewis Hamilton|
|        2|  heidfeld|    \N| HEI|    Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|    Nick Heidfeld|
|        3|   rosberg|     6| ROS|    Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|     Nico Rosberg|
|        4|    alonso|    14| ALO|Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|  Fernando Alonso|
|        5|kovalainen|    \N| KOV|  Heikki|Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|Heikki Kovalainen|
+---------+----------+------+----+--------+----------+----------

## Drop unwanted columns

In [112]:
drivers_final_df = drivers_with_columns_df.drop(col("url"))


## Write outut to parquet file


In [113]:
drivers_final_df.write.mode("overwrite").parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/drivers")

## **5.ingest_results_file**

## Define schema


In [37]:
results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", FloatType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", FloatType(), True),
    StructField("statusId", StringType(), True),
])

## Read the CSV file from drive & apply schema


In [38]:
results_df = spark.read.schema(results_schema).csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/results.csv")


## Rename columns

In [39]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id")\
.withColumnRenamed("raceId", "race_id").withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("constructorId", "constructor_id").withColumnRenamed("positionText", "position_text")\
.withColumnRenamed("positionOrder", "position_order").withColumnRenamed("fastestLap", "fastest_lap")\
.withColumnRenamed("fastestLapTime", "fastest_lap_time").withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")

In [40]:
results_with_columns_df.columns


['result_id',
 'race_id',
 'driver_id',
 'constructor_id',
 'number',
 'grid',
 'position',
 'position_text',
 'position_order',
 'points',
 'laps',
 'time',
 'milliseconds',
 'fastest_lap',
 'rank',
 'fastest_lap_time',
 'fastest_lap_speed',
 'statusId']

## Drop unwanted columns


In [41]:
results_final_df = results_with_columns_df.drop(col("statusId"))


## Write outut to parquet file


In [42]:
results_final_df.write.mode('overwrite').parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/results")

## **6.ingest_pit_stops_file**


## Define schema


In [43]:
pit_stops_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("stop", StringType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

## Read the CSV file from drive & apply schema


In [44]:
pit_stops_df = spark.read.schema(pit_stops_schema).csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/pit_stops.csv")

In [45]:
pit_stops_df.show(5)


+------+--------+----+----+--------+--------+------------+
|raceId|driverId|stop| lap|    time|duration|milliseconds|
+------+--------+----+----+--------+--------+------------+
|  NULL|    NULL|stop|NULL|    time|duration|        NULL|
|   841|     153|   1|   1|17:05:23|  26.898|       26898|
|   841|      30|   1|   1|17:05:52|  25.021|       25021|
|   841|      17|   1|  11|17:20:48|  23.426|       23426|
|   841|       4|   1|  12|17:22:34|  23.251|       23251|
+------+--------+----+----+--------+--------+------------+
only showing top 5 rows



## Rename columns


In [46]:
final_df = pit_stops_df.withColumnRenamed("driverId", "driver_id").withColumnRenamed("raceId", "race_id")


## Write outut to parquet file


In [47]:
final_df.write.mode('overwrite').parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/pit_stops")

## **7.ingest_lap_times_file**

## Define schema


In [48]:
lap_times_schema = StructType(fields=[
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

## Read the csv file from drive & apply schema


In [49]:
lap_times_df = spark.read.schema(lap_times_schema).csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/lap_times.csv")


## Rename columns


In [50]:
final_df = lap_times_df.withColumnRenamed("driverId", "driver_id").withColumnRenamed("raceId", "race_id")


## Write outut to parquet file


In [51]:
final_df.write.mode('overwrite').parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/lap_times")

##** 8.ingest_qualifying_file**

## Define schema


In [114]:
qualifying_schema = StructType(fields=[
    StructField("qualifyId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True),
])

## Read the JSON file from drive & apply schema


In [115]:
qualifying_df = spark.read.schema(qualifying_schema).csv("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/data/qualifying.csv")


In [116]:
qualifying_df.show(5)


+---------+------+--------+-------------+------+--------+--------+--------+--------+
|qualifyId|raceId|driverId|constructorId|number|position|      q1|      q2|      q3|
+---------+------+--------+-------------+------+--------+--------+--------+--------+
|     NULL|  NULL|    NULL|         NULL|  NULL|    NULL|      q1|      q2|      q3|
|        1|    18|       1|            1|    22|       1|1:26.572|1:25.187|1:26.714|
|        2|    18|       9|            2|     4|       2|1:26.103|1:25.315|1:26.869|
|        3|    18|       5|            1|    23|       3|1:25.664|1:25.452|1:27.079|
|        4|    18|      13|            6|     2|       4|1:25.994|1:25.691|1:27.178|
+---------+------+--------+-------------+------+--------+--------+--------+--------+
only showing top 5 rows



## Rename columns


In [117]:
final_df = qualifying_df.withColumnRenamed("qualifyId", "qualify_id")\
.withColumnRenamed("raceId", "race_id").withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("constructorId", "constructor_id").withColumn("ingestion_date", current_timestamp())

## Write outut to parquet file


In [118]:
final_df.write.mode('overwrite').parquet("/content/drive/MyDrive/Formula1DataAnalytics-main/Formula1DataAnalytics-main/processed/qualifying")