In [15]:
from pyspark.sql.functions import col, lit, concat, explode
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, ArrayType

In [None]:
spark = SparkSession.builder \
    .appName("F1 Data Pipeline") \
    .config("spark.ui.port", "4040") \
    .config("spark.jars", "file:///C:/spark/spark-3.5.1-bin-hadoop3/jars/gcs-connector-hadoop3-latest.jar") \
    .getOrCreate()

# Set GCS authentication and filesystem implementation
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.json.keyfile", "path/to/service-account-file.json")


In [17]:
# Define the schema
location_schema = StructType(fields=[StructField("lat", StringType(), True),
                                     StructField("long", StringType(), True),
                                     StructField("locality", StringType(), True),
                                     StructField("country", StringType(), True)
])

circuits_schema = StructType(fields=[StructField("circuitId", StringType(), False),
                                     StructField("url", StringType(), True),
                                     StructField("circuitName", StringType(), True),
                                     StructField("Location", location_schema),  # Nested schema
])

drivers_schema = StructType(fields=[StructField("driverId", StringType(), False),
                                    StructField("permanentNumber", StringType(), True),
                                    StructField("code", StringType(), True),
                                    StructField("url", StringType(), True),
                                    StructField("givenName", StringType(), True),
                                    StructField("familyName", StringType(), True),
                                    StructField("dateOfBirth", StringType(), True),
                                    StructField("nationality", StringType(), True)
])

constructors_schema = StructType(fields=[StructField("constructorId", StringType(), False),
                                     StructField("url", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("nationality", StringType(), True)
])

time_schema = StructType(fields=[StructField("millis", StringType(), False),
                                StructField("time", StringType(), True),
])

results_schema = StructType([StructField("season", StringType(), False),
                             StructField("round", StringType(), True),
                             StructField("url", StringType(), True),
                             StructField("raceName", StringType(), True),
                             StructField("Circuit", circuits_schema),  # Nested schema
                             StructField("date", StringType(), True),
                             StructField("Results", ArrayType(StructType([StructField("number", StringType(), False),
                                                                          StructField("position", StringType(), True),
                                                                          StructField("positionText", StringType(), True),
                                                                          StructField("points", StringType(), True),
                                                                          StructField("Driver", drivers_schema),  # Nested schema
                                                                          StructField("Constructor", constructors_schema),  # Nested schema,
                                                                          StructField("grid", StringType(), True),
                                                                          StructField("laps", StringType(), True),
                                                                          StructField("status", StringType(), True),
                                                                          StructField("Time", time_schema),  # Nested schema
                                                                          ])), True)
])

In [18]:
results_df = spark.read \
.schema(results_schema) \
.option("multiLine", True) \
.json("gs://f1-gcp/raw/results.json")

In [19]:
results_df.printSchema()

root
 |-- season: string (nullable = true)
 |-- round: string (nullable = true)
 |-- url: string (nullable = true)
 |-- raceName: string (nullable = true)
 |-- Circuit: struct (nullable = true)
 |    |-- circuitId: string (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- circuitName: string (nullable = true)
 |    |-- Location: struct (nullable = true)
 |    |    |-- lat: string (nullable = true)
 |    |    |-- long: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |-- date: string (nullable = true)
 |-- Results: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- position: string (nullable = true)
 |    |    |-- positionText: string (nullable = true)
 |    |    |-- points: string (nullable = true)
 |    |    |-- Driver: struct (nullable = true)
 |    |    |    |-- driverId: string (nullable = true)
 |    |    |

In [20]:
results_df.show(truncate=False)

+------+-----+-----------------------------------------------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
# Step 4: Explode the "Drivers" array into multiple rows. Also .explode() only work with ArrayType or MapType columns
results_exploded_df = results_df.select(col("season").cast(IntegerType()).alias("season"),
                                        col("round").cast(IntegerType()).alias("round"),
                                        col("url").alias("race_url"),
                                        col("raceName").alias("race_name"),
                                        col("Circuit.circuitId").alias("circuit_id"),
                                        col("Circuit.url").alias("circuit_url"),
                                        col("Circuit.circuitName").alias("circuit_name"),
                                        col("Circuit.Location.lat").alias("circuit_latitude"),
                                        col("Circuit.Location.long").alias("circuit_longitude"),
                                        col("Circuit.Location.locality").alias("circuit_locality"),
                                        col("Circuit.Location.country").alias("circuit_country"),
                                        col("date").cast(DateType()).alias("race_date"),
                                        explode("Results").alias("Results")
                                        )

results_exploded_df.show(truncate=False)

+------+-----+-----------------------------------------------------+------------------+-----------+-------------------------------------------------+-------------------+----------------+-----------------+----------------+---------------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|season|round|race_url                                             |race_name         |circuit_id |circuit_url                                      |circuit_name       |circuit_latitude|circuit_longitude|circuit_locality|circuit_country|race_date |Results                                                                                                                                                                                                                                     |
+------+-----+--------------

In [22]:
# Cast the columns
results_cast_df = results_exploded_df.withColumn("car_number", col("Results.number").cast(IntegerType())) \
                                    .withColumn("position", col("Results.position").cast(IntegerType())) \
                                    .withColumn("position_text", col("Results.positionText")) \
                                    .withColumn("points", col("Results.points").cast(IntegerType())) \
                                    .withColumn("driver_id", col("Results.Driver.driverId")) \
                                    .withColumn("driver_url", col("Results.Driver.url")) \
                                    .withColumn("driver_name", concat(col("Results.Driver.givenName"), lit(" "), col("Results.Driver.familyName")))\
                                    .withColumn("driver_date_of_birth", col("Results.Driver.dateOfBirth").cast(DateType())) \
                                    .withColumn("driver_nationality", col("Results.Driver.nationality")) \
                                    .withColumn("constructor_Id", col("Results.Constructor.constructorId")) \
                                    .withColumn("constructor_url", col("Results.Constructor.url")) \
                                    .withColumn("constructor_name", col("Results.Constructor.name"))\
                                    .withColumn("constructor_nationality", col("Results.Constructor.nationality")) \
                                    .withColumn("grid", col("Results.grid").cast(IntegerType())) \
                                    .withColumn("laps", col("Results.laps").cast(IntegerType())) \
                                    .withColumn("status", col("Results.status")) \
                                    .withColumn("millis", col("Results.Time.millis").cast(IntegerType())) \
                                    .withColumn("time", col("Results.Time.time")) \
                                    .drop("Results")
results_cast_df.show(truncate=False)

+------+-----+-----------------------------------------------------+------------------+-----------+-------------------------------------------------+-------------------+----------------+-----------------+----------------+---------------+----------+----------+--------+-------------+------+------------+----------------------------------------------------+--------------------+--------------------+------------------+--------------+-------------------------------------------------------+----------------+-----------------------+----+----+--------------+-------+-----------+
|season|round|race_url                                             |race_name         |circuit_id |circuit_url                                      |circuit_name       |circuit_latitude|circuit_longitude|circuit_locality|circuit_country|race_date |car_number|position|position_text|points|driver_id   |driver_url                                          |driver_name         |driver_date_of_birth|driver_nationality|constructo

In [23]:
#  use .drop() to remove columns that are not needed
results_final_df = results_cast_df.drop(col("race_url"), col("circuit_url"), col("driver_url"), col("constructor_url"))
results_final_df.show(truncate=False)

+------+-----+------------------+-----------+-------------------+----------------+-----------------+----------------+---------------+----------+----------+--------+-------------+------+------------+--------------------+--------------------+------------------+--------------+----------------+-----------------------+----+----+--------------+-------+-----------+
|season|round|race_name         |circuit_id |circuit_name       |circuit_latitude|circuit_longitude|circuit_locality|circuit_country|race_date |car_number|position|position_text|points|driver_id   |driver_name         |driver_date_of_birth|driver_nationality|constructor_Id|constructor_name|constructor_nationality|grid|laps|status        |millis |time       |
+------+-----+------------------+-----------+-------------------+----------------+-----------------+----------------+---------------+----------+----------+--------+-------------+------+------------+--------------------+--------------------+------------------+--------------+----

In [24]:
#  use .select() to select columns that are needed
results_final_df = results_cast_df.select(col("season"), col("round"), col("race_name"), col("circuit_id"), col("constructor_id"), col("driver_id"), col("car_number"), col("position"), col("position_text"), col("points"), col("grid"), col("laps"), col("status"), col("millis"), col("time"), col("race_date"))
results_final_df.show(truncate=False)

+------+-----+------------------+-----------+--------------+------------+----------+--------+-------------+------+----+----+--------------+-------+-----------+----------+
|season|round|race_name         |circuit_id |constructor_id|driver_id   |car_number|position|position_text|points|grid|laps|status        |millis |time       |race_date |
+------+-----+------------------+-----------+--------------+------------+----------+--------+-------------+------+----+----+--------------+-------+-----------+----------+
|1950  |1    |British Grand Prix|silverstone|alfa          |farina      |2         |1       |1            |9     |1   |70  |Finished      |8003600|2:13:23.600|1950-05-13|
|1950  |1    |British Grand Prix|silverstone|alfa          |fagioli     |3         |2       |2            |6     |2   |70  |Finished      |8006200|+2.600     |1950-05-13|
|1950  |1    |British Grand Prix|silverstone|alfa          |reg_parnell |4         |3       |3            |4     |4   |70  |Finished      |805560

In [25]:
results_final_df.printSchema()

root
 |-- season: integer (nullable = true)
 |-- round: integer (nullable = true)
 |-- race_name: string (nullable = true)
 |-- circuit_id: string (nullable = true)
 |-- constructor_id: string (nullable = true)
 |-- driver_id: string (nullable = true)
 |-- car_number: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- position_text: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- grid: integer (nullable = true)
 |-- laps: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- millis: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- race_date: date (nullable = true)



In [26]:
results_final_df.write.mode("overwrite").format("parquet").save("gs://f1-gcp/processed/results/")

In [27]:
spark.read.parquet("gs://f1-gcp/processed/results/").show()

+------+-----+------------------+-----------+--------------+------------+----------+--------+-------------+------+----+----+--------------+-------+-----------+----------+
|season|round|         race_name| circuit_id|constructor_id|   driver_id|car_number|position|position_text|points|grid|laps|        status| millis|       time| race_date|
+------+-----+------------------+-----------+--------------+------------+----------+--------+-------------+------+----+----+--------------+-------+-----------+----------+
|  1950|    1|British Grand Prix|silverstone|          alfa|      farina|         2|       1|            1|     9|   1|  70|      Finished|8003600|2:13:23.600|1950-05-13|
|  1950|    1|British Grand Prix|silverstone|          alfa|     fagioli|         3|       2|            2|     6|   2|  70|      Finished|8006200|     +2.600|1950-05-13|
|  1950|    1|British Grand Prix|silverstone|          alfa| reg_parnell|         4|       3|            3|     4|   4|  70|      Finished|805560

In [28]:
spark.stop()