<a href="https://colab.research.google.com/github/PBuenoc/f1ProjectInGoogleColab/blob/main/F1ProjectInColab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ingestion

## Prepare spark environment

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName("Iniciando com Spark").getOrCreate()

## Ingest csv files

### Ingest circuits.csv file

In [None]:
circuits_df = spark.read.csv('/content/formula1/data/raw/circuits2.csv',inferSchema=True, header=True, sep=';')

#### Select only the columns required

In [None]:
circuits_df = circuits_df.select('circuitId', 'circuitRef', 'name', 'location', 'country', 'lat', 'lng', 'alt')

#### Renamed the columns as required

In [None]:
circuits_df = circuits_df.withColumnRenamed('circuitId', 'circuit_id') \
.withColumnRenamed('circuitRef', 'circuit_ref') \
.withColumnRenamed('lat','latitude') \
.withColumnRenamed('lng','longitude') \
.withColumnRenamed('alt','altitude')

#### Add ingestion_date column

In [None]:
from pyspark.sql.functions import current_timestamp

In [None]:
circuits_df = circuits_df.withColumn('ingestion_date', current_timestamp())

#### Specify the types as required

In [None]:
circuits_df = circuits_df.withColumn('latitude', circuits_df['latitude'].cast('double')) \
.withColumn('longitude', circuits_df['longitude'].cast('double')) \
.withColumn('altitude', circuits_df['altitude'].cast('integer'))

#### Write the data in parquet format on processed folder

In [None]:
circuits_df.write.mode('overwrite').parquet('/content/formula1/data/processed/circuits')

### Ingest races.csv

In [None]:
races_df = spark.read.csv('/content/formula1/data/raw/races2.csv', header=True, inferSchema=True, sep=';')

#### Add the required columns

In [None]:
from pyspark.sql.functions import col, concat, lit

In [None]:
races_df = races_df.withColumn('ingestion_date', current_timestamp()) \
.withColumn('race_timestamp',concat(col('date'), lit(' '), col('time')))

#### Select only the required columns

In [None]:
races_df = races_df.select(col('raceId').alias('race_id'),
                           col('year').alias('race_year'), 
                           col('round'), 
                           col('circuitId').alias('circuit_id'),
                           col('name'),
                           col('ingestion_date'),
                           col('race_timestamp'))

#### Write the data in parquet format on processed folder with partitionBy

In [None]:
races_df.write.mode('overwrite').partitionBy('race_year').parquet('/content/formula1/data/processed/races')

## Ingest JSON files

### Ingest constructors.json

In [None]:
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

In [None]:
constructors_df = spark.read.json('/content/formula1/data/raw/constructors.json', schema=constructors_schema)

#### Drop unwanted columns from the dataframe

In [None]:
constructors_df = constructors_df.drop(constructors_df.url)

#### Rename columns and add ingestion date

In [None]:
constructors_df = constructors_df.withColumnRenamed('constructorId', 'constructor_id') \
                                .withColumnRenamed('constructorRef', 'constructor_ref') \
                                .withColumn('ingestion_date', current_timestamp())

In [None]:
constructors_df.write.mode('overwrite').parquet('/content/formula1/data/processed/constructors')

### Ingest drivers.json - Nested JSON

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [None]:
name_schema = StructType(fields=[StructField("forename", StringType(), True),
                                 StructField("surname", StringType(), True)
  
])

In [None]:
drivers_schema = StructType(fields=[StructField("driverId", IntegerType(), False),
                                    StructField("driverRef", StringType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("code", StringType(), True),
                                    StructField("name", name_schema),
                                    StructField("dob", DateType(), True),
                                    StructField("nationality", StringType(), True),
                                    StructField("url", StringType(), True)])

In [None]:
drivers_df = spark.read \
.schema(drivers_schema) \
.json('/content/formula1/data/raw/drivers.json')

#### Rename and add new columns

In [None]:
from pyspark.sql.functions import col, concat, lit

In [None]:
drivers_df = drivers_df.withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("driverRef", "driver_ref") \
                                    .withColumn("name", concat(col("name.forename"), lit(" "), col("name.surname")))

In [None]:
from pyspark.sql.functions import current_timestamp

In [None]:
drivers_df = drivers_df.withColumn('ingestion_date', current_timestamp())

#### Drop unwanted columns
url \
name.forename \
name.surname \

In [None]:
drivers_df = drivers_df.drop('url')

#### Write the output to processed folder in parquet format


In [None]:
drivers_df.write.mode('overwrite').parquet('/content/formula1/data/processed/drivers')

### Ingest results.json

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [None]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [None]:
results_df = spark.read \
.schema(results_schema) \
.json('/content/formula1/data/raw/results.json')

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|          218.3|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

#### Drop, rename and add required columns

In [None]:
results_df = results_df.drop('statusId')

In [None]:
results_df = results_df.withColumnRenamed('resultId', 'result_id') \
                       .withColumnRenamed('raceId', 'race_id') \
                       .withColumnRenamed('driverId', 'driver_id') \
                       .withColumnRenamed('constructorId', 'constructor_id') \
                       .withColumnRenamed('positionText', 'position_text') \
                       .withColumnRenamed('positionOrder', 'position_order') \
                       .withColumnRenamed('fastestLap', 'fastest_lap') \
                       .withColumnRenamed('fastestLapTime', 'fastest_lap_time') \
                       .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed')

In [None]:
from pyspark.sql.functions import current_timestamp

In [None]:
results_df = results_df.withColumn('ingestion_date', current_timestamp())

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|      ingestion_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+
|        1|     18|        1|             1|    22|   1|       1|            1|             1|  10.0|  58|1:34:50.616|     5690616|         39|   2|        1:27.452|            218.3|2023-03-16 01:43:...|
|        2|     18|        2|             2|     3|   5|       2|            2|             2|   8.0|  58|     +5.478|     5696094|         41|   3|        1:27.739|          217.5

#### Write the output to processed folder in parquet format

In [None]:
results_df.write.mode('overwrite').partitionBy('race_id').parquet('/content/formula1/data/processed/results')


### Ingest pitstops.json

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
pit_stops_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("stop", StringType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("duration", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

In [None]:
pit_stops_df = spark.read \
.schema(pit_stops_schema) \
.option('multiline', True) \
.json('/content/formula1/data/raw/pit_stops.json')

### Rename and add columns as required

In [None]:
pit_stops_df = pit_stops_df.withColumnRenamed('raceId', 'race_id') \
                           .withColumnRenamed('driverId', 'driver_id')

In [None]:
pit_stops_df = pit_stops_df.withColumn('ingestion_date', current_timestamp())

#### Write the output to processed folder in parquet format

In [None]:
pit_stops_df.write.mode("overwrite").parquet('/content/formula1/data/processed/pit_stops')

## Ingest multiple files

### Ingest lap_times folder

In [9]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [34]:
lap_times_schema = StructType(fields=[StructField('raceId', IntegerType(), False),
                                      StructField('driverId', IntegerType(), False),
                                      StructField('lap', IntegerType(), False),
                                      StructField('position', IntegerType(), True),
                                      StructField('time', StringType(), True),
                                      StructField('milliseconds', IntegerType(), True),
])

In [35]:
lap_times_df = spark.read \
.schema(lap_times_schema) \
.csv('/content/formula1/data/raw/lap_times')

In [36]:
from pyspark.sql.functions import current_timestamp

In [37]:
lap_times_df = lap_times_df.withColumnRenamed('raceId', 'race_id') \
                           .withColumnRenamed('driverId', 'driver_id') \
                           .withColumn('ingestion_date', current_timestamp())

In [38]:
lap_times_df.write.mode('overwrite').parquet('/content/formula1/data/processed/lap_times')

### Ingest qualifying files - Multi files of MultIline JSON 

In [55]:
qualifying_schema = StructType(fields=[StructField('qualifyId', IntegerType(), False),
                                         StructField('raceId', IntegerType(), True),
                                         StructField('driverId', IntegerType(), True),
                                         StructField('constructorId', IntegerType(), True),
                                         StructField('number', IntegerType(), True),
                                         StructField('position', IntegerType(), True),
                                         StructField('q1', StringType(), True),
                                         StructField('q2', StringType(), True),
                                         StructField('q3', StringType(), True),
  
])

In [59]:
qualifying_df = spark.read \
.schema(qualifying_schema) \
.option('multiLine', True) \
.json('/content/formula1/data/raw/qualifying/*')

In [60]:
qualifying_df = qualifying_df.withColumnRenamed('qualifyId', 'qualify_id') \
                             .withColumnRenamed('raceId', 'race_id') \
                             .withColumnRenamed('driverId', 'driver_id') \
                             .withColumnRenamed('constructorId', 'constructor_id') \
                             .withColumn('ingestion_date', current_timestamp())

In [62]:
qualifying_df.write.mode('overwrite').parquet('/content/formula1/data/processed/qualifying')