# **Creating Spark Context**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "2").appName("Analysis").master("local[2]").getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

# **Defining schema**

In [3]:
# circuits
circuits_schema = StructType(fields=[
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True),
])

# races
races_schema = StructType(fields=[
    StructField("raceId", IntegerType(), False),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuitId", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True),
])

# constructor
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

# results
results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", FloatType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", FloatType(), True),
    StructField("statusId", StringType(), True),
])

# pit stop
pit_stops_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("stop", StringType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

# lap time
lap_times_schema = StructType(fields=[
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

# qualifying
qualifying_schema = StructType(fields=[
    StructField("qualifyId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True),
])

# for driver we are directly reading the csv file

# **Reading data from JSON and applying the structure to the data to convert it to CSV**

In [4]:
data = "/content/drive/MyDrive/DBDA_PROJECT/Formula1DataAnalytics-main/Main/data"
processed_folder_path = "/content/drive/MyDrive/DBDA_PROJECT/Formula1DataAnalytics-main/Main/processed_folder_path"

In [8]:
circuits_df = spark.read.option("header", True).schema(circuits_schema).csv(f"{data}/circuits.csv")

races_df = spark.read.option("header", True).schema(races_schema).csv(f"{data}/races.csv")

constructor_df = spark.read.schema(constructors_schema).csv(f"{data}/constructors.csv", header = True)

results_df = spark.read.schema(results_schema).csv(f"{data}/results.csv", header = True)

pit_stops_df = spark.read.schema(pit_stops_schema).option("multiLine", True).json(f"{data}/pit_stops.csv")

lap_times_df = spark.read.schema(lap_times_schema).csv(f"{data}/lap_times.csv")

qualifying_df = spark.read.schema(qualifying_schema).csv(f"{data}/qualifying.csv", header = True)

drivers_df = spark.read.csv(f"{data}/drivers.csv", header = True)

# **Displaying the data**

In [9]:
circuits_df.show(5)

races_df.show(5)

constructor_df.show(5)

results_df.show(5)

pit_stops_df.show(5)

lap_times_df.show(5)

qualifying_df.show(5)

drivers_df.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|NULL|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|NULL|http://en.wikiped...|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|NULL|http://en.wikiped...|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|NULL|http://en.wikiped...|
+---------+-----------+--------------------+------------+---------+--------+-------+----+--------------------+
o

# **Printing the schema**

In [10]:
circuits_df.printSchema()
races_df.printSchema()
constructor_df.printSchema()
results_df.printSchema()
pit_stops_df.printSchema()
lap_times_df.printSchema()
qualifying_df.printSchema()
drivers_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)

root
 |-- raceId: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- round: integer (nullable = true)
 |-- circuitId: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- date: date (nullable = true)
 |-- time: string (nullable = true)
 |-- url: string (nullable = true)

root
 |-- constructorId: integer (nullable = true)
 |-- constructorRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)

root
 |-- resultId: integer (nullable = true)
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- construc

In [11]:
circuits_df.describe().show(5)
races_df.describe().show(5)
constructor_df.describe().show(5)
results_df.describe().show(5)
pit_stops_df.describe().show(5)
lap_times_df.describe().show(5)
qualifying_df.describe().show(5)

+-------+-----------------+----------+-------+---------+---------+------------------+------------------+----+--------------------+
|summary|        circuitId|circuitRef|   name| location|  country|               lat|               lng| alt|                 url|
+-------+-----------------+----------+-------+---------+---------+------------------+------------------+----+--------------------+
|  count|               74|        74|     74|       74|       74|                74|                74|   1|                  74|
|   mean|             37.5|      NULL|   NULL|     NULL|     NULL|33.698638243243224|3.1288148648648644|10.0|                NULL|
| stddev|21.50581316760657|      NULL|   NULL|     NULL|     NULL| 23.27327352478035| 66.04182770715761|NULL|                NULL|
|    min|                1|       BAK|A1-Ring|Abu Dhabi|Argentina|          -37.8497|          -118.189|  10|http://en.wikiped...|
|    max|               74|    zolder| Zolder|Zandvoort|  Vietnam|           57.265

In [12]:
circuits_df_selected = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))
races_selected_df = races_df.select(col("raceId").alias('race_id'), col("year").alias('race_year'), col("round"), col('circuitId').alias('circuit_id'), col("name"))
constructor_selected_df = constructor_df.select(col("constructorId"), col("constructorRef"), col("name"), col("nationality"), col("url"))

In [13]:
circuits_df_selected.show(5)
races_selected_df.show(5)
constructor_df.show(5)
drivers_df.show(5)
pit_stops_df.show(5)
qualifying_df.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+----+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng| alt|
+---------+-----------+--------------------+------------+---------+--------+-------+----+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968|  10|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738|NULL|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|NULL|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|NULL|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|NULL|
+---------+-----------+--------------------+------------+---------+--------+-------+----+
only showing top 5 rows

+-------+---------+-----+----------+--------------------+
|race_id|race_year|round|circuit_id|                name|
+-------+---------+-----+----------+-------------

In [14]:
circuits_renamed_df = circuits_df_selected.withColumnRenamed("circuitId", "circuit_id")\
.withColumnRenamed("circuitRef", "circuit_ref").withColumnRenamed("lat", "latitude")\
.withColumnRenamed("lng", "longitude").withColumnRenamed("alt", "altitude")

constructor_final_df = constructor_df.withColumnRenamed("constructorId", "constructor_id")\
.withColumnRenamed("constructorRef", "constructor_ref")

drivers_with_columns_df = drivers_df.withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("driverRef", "driver_ref")\
.withColumn("name", concat(col("forename"), lit(" "), col("surname")))

results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id")\
.withColumnRenamed("raceId", "race_id").withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("constructorId", "constructor_id").withColumnRenamed("positionText", "position_text")\
.withColumnRenamed("positionOrder", "position_order").withColumnRenamed("fastestLap", "fastest_lap")\
.withColumnRenamed("fastestLapTime", "fastest_lap_time").withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")\

pit_final_df = pit_stops_df.withColumnRenamed("driverId", "driver_id").withColumnRenamed("raceId", "race_id")

lap_final_df = lap_times_df.withColumnRenamed("driverId", "driver_id").withColumnRenamed("raceId", "race_id")

qualify_final_df = qualifying_df.withColumnRenamed("qualifyId", "qualify_id")\
.withColumnRenamed("raceId", "race_id").withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("constructorId", "constructor_id").withColumn("ingestion_date", current_timestamp())

In [15]:
constructor_dropped_df = constructor_df.drop(col('url'))
drivers_final_df = drivers_with_columns_df.drop(col("url"))
results_final_df = results_with_columns_df.drop(col("statusId"))
circuits_renamed_df = circuits_renamed_df.drop(col("url"))

In [16]:
circuits_renamed_df.show(5)
constructor_final_df.show(5)
drivers_with_columns_df.show(5)
results_with_columns_df.columns

+----------+-----------+--------------------+------------+---------+--------+---------+--------+
|circuit_id|circuit_ref|                name|    location|  country|latitude|longitude|altitude|
+----------+-----------+--------------------+------------+---------+--------+---------+--------+
|         1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|
|         2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|    NULL|
|         3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|    NULL|
|         4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|    NULL|
|         5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|    NULL|
+----------+-----------+--------------------+------------+---------+--------+---------+--------+
only showing top 5 rows

+--------------+---------------+----------+-----------+--------------------+
|constructor_id|construct

['result_id',
 'race_id',
 'driver_id',
 'constructor_id',
 'number',
 'grid',
 'position',
 'position_text',
 'position_order',
 'points',
 'laps',
 'time',
 'milliseconds',
 'fastest_lap',
 'rank',
 'fastest_lap_time',
 'fastest_lap_speed',
 'statusId']

In [17]:
circuits_renamed_df.write.mode('overwrite').parquet(f"{processed_folder_path}/circuits")
races_selected_df.write.mode('overwrite').parquet(f"{processed_folder_path}/races")
constructor_final_df.write.mode('overwrite').parquet(f"{processed_folder_path}/constructors")
drivers_final_df.write.mode('overwrite').parquet(f"{processed_folder_path}/drivers")
results_final_df.write.mode('overwrite').parquet(f"{processed_folder_path}/results")
pit_final_df.write.mode('overwrite').parquet(f"{processed_folder_path}/pit_stops")
lap_final_df.write.mode('overwrite').parquet(f"{processed_folder_path}/lap_times")
qualify_final_df.write.mode('overwrite').parquet(f"{processed_folder_path}/qualifying")