In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession
            .builder
            .master('local[1]')
            .appName('F1-Racing-DE')
            .getOrCreate())

23/05/09 21:37:07 WARN Utils: Your hostname, MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
23/05/09 21:37:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/09 21:37:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/09 21:37:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/09 21:37:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/05/09 21:37:07 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


### Circuits

In [2]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
from pyspark.sql.functions import current_timestamp, col

In [3]:
fields = [
    StructField('circuitId', IntegerType(), False),
    StructField('circuitRef', StringType(), True),
    StructField('name', StringType(), True),
    StructField('location', StringType(), True),
    StructField('country', StringType(), True),
    StructField('lat', DoubleType(), True),
    StructField('lng', DoubleType(), True),
    StructField('alt', IntegerType(), True),
    StructField('url', StringType(), True),
]

circuits_schema = StructType(fields=fields)

In [4]:
circuits_df = (spark
                .read
                .option('header', True)
                .schema(circuits_schema)
                .format('csv')
                .load('data/circuits.csv'))

In [5]:
circuits_renamed_df = circuits_df\
    .withColumnRenamed('circuitId', 'circuit_id')\
    .withColumnRenamed('circuitRef', 'circuit_ref')\
    .withColumnRenamed('lat', 'latitude')\
    .withColumnRenamed('lng', 'longitude')\
    .withColumnRenamed('alt', 'altitde')

In [6]:
circuits_ingestion_df = circuits_renamed_df.withColumn('ingestion_date', current_timestamp())
circuits_final_df = circuits_ingestion_df.drop(col('url'))

In [7]:
circuits_final_df.write.mode('overwrite').parquet('data/processed/circuits')

                                                                                

### Races

In [8]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DateType
from pyspark.sql.functions import (
    current_timestamp,
    col,
    when,
    lit,
    concat,
    to_timestamp
)

In [9]:
fields = [
    StructField('raceId', IntegerType(), False),
    StructField('year', IntegerType(), True),
    StructField('round', IntegerType(), True),
    StructField('circuitId', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('date', StringType(), True),
    StructField('time', StringType(), True),
    StructField('url', StringType(), True),
    StructField('fp1_date', DateType(), True),
    StructField('fp1_time', StringType(), True),
    StructField('fp2_date', DateType(), True),
    StructField('fp2_time', StringType(), True),
    StructField('fp3_date', DateType(), True),
    StructField('fp3_time', StringType(), True),
    StructField('quali_date', DateType(), True),
    StructField('quali_time', StringType(), True),
    StructField('sprint_date', DateType(), True),
    StructField('sprint_time', StringType(), True)
]

races_schema = StructType(fields=fields)

In [10]:
races_df = spark\
    .read\
    .option('header', True)\
    .schema(races_schema)\
    .format('csv')\
    .load('data/races.csv')

In [11]:
for column in races_df.columns:
    races_df = races_df.withColumn(
        column,
        when(col(column) == '\\N', None)\
            .otherwise(col(column))
    )

In [12]:
races_renamed_df = races_df\
    .withColumnRenamed('raceId', 'race_id')\
    .withColumnRenamed('year', 'race_year')\
    .withColumnRenamed('circuitId', 'circuit_id')

In [13]:
ts_column = concat(col('date'), lit(' '), col('time'))

race_final_df = races_renamed_df\
    .withColumn('race_timestamp', to_timestamp(ts_column, 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('ingestion_date', current_timestamp())\
    .drop('date', 'time', 'url')

In [14]:
race_final_df.write.partitionBy('race_year').mode('overwrite').parquet('data/processed/races')

                                                                                

### Constructors

In [15]:
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql.functions import current_timestamp

In [16]:
fields = [
    StructField('constructorId', IntegerType(), False),
    StructField('constructorRef', StringType(), True),
    StructField('name', StringType(), True),
    StructField('nationality', StringType(), True),
    StructField('url', StringType(), True),
]

constructors_schema = StructType(fields=fields)

In [17]:
constructors_df = spark\
    .read\
    .option('header', True)\
    .schema(constructors_schema)\
    .format('csv')\
    .load('data/constructors.csv')

In [18]:
constructors_final_df = constructors_df\
    .withColumnRenamed('constructorId', 'constructor_id')\
    .withColumnRenamed('constructorref', 'constructo_ref')\
    .withColumn('ingestion_date', current_timestamp())\
    .drop('url')

In [19]:
constructors_final_df.write.mode('overwrite').parquet('data/processed/constructors')

### Drivers

In [20]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql.functions import current_timestamp, concat, col, lit

In [21]:
fields = [
    StructField('driverId', IntegerType(), False),
    StructField('driverRef', StringType(), True),
    StructField('number', IntegerType(), True),
    StructField('code', IntegerType(), True),
    StructField('forename', StringType(), True),
    StructField('surname', StringType(), True),
    StructField('dob', StringType(), True),
    StructField('nationality', StringType(), True),
    StructField('url', StringType(), True),
]

drivers_schema = StructType(fields=fields)

In [22]:
drivers_df = spark\
    .read\
    .option('header', True)\
    .schema(drivers_schema)\
    .format('csv')\
    .load('data/drivers.csv')

In [23]:
drivers_renamed_df = drivers_df\
    .withColumnRenamed('driverId', 'driver_id')\
    .withColumnRenamed('driverRef', 'driver_ref')

In [24]:
drivers_transformed_df = drivers_renamed_df\
    .withColumn('name', concat(col('forename'), lit(' '), col('surname')))\
    .withColumn('ingestion_date', current_timestamp())

In [25]:
drivers_final_df = drivers_transformed_df\
    .drop('forename', 'surname', 'url')\
    .select('driver_id', 'driver_ref', 'name', 'number', 'code', 'dob', 'nationality', 'ingestion_date')

In [26]:
drivers_final_df.write.mode('overwrite').parquet('data/processed/drivers')

### Results

In [27]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, when, current_timestamp

In [28]:
fields = [
    StructField('resultId', IntegerType(), False),
    StructField('raceId', IntegerType(), True),
    StructField('driverId', IntegerType(), True),
    StructField('constructorId', IntegerType(), True),
    StructField('number', IntegerType(), True),
    StructField('grid', IntegerType(), True),
    StructField('position', IntegerType(), True),
    StructField('positionText', IntegerType(), True),
    StructField('positionOrder', IntegerType(), True),
    StructField('points', IntegerType(), True),
    StructField('laps', IntegerType(), True),
    StructField('time', StringType(), True),
    StructField('milliseconds', IntegerType(), True),
    StructField('fastestLap', IntegerType(), True),
    StructField('rank', IntegerType(), True),
    StructField('fastestLapTime', StringType(), True),
    StructField('fastestLapSpeed', FloatType(), True),
    StructField('statusId', IntegerType(), True)
]

results_schema = StructType(fields=fields)

In [29]:
results_df = spark\
    .read\
    .option('header', True)\
    .schema(results_schema)\
    .format('csv')\
    .load('data/results.csv')

In [30]:
for column in results_df.columns:
    results_df = results_df.withColumn(
        column,
        when(col(column) == '\\N', None)\
            .otherwise(col(column))
    )

In [31]:
results_renamed_df = results_df\
    .withColumnRenamed('resultId', 'result_id')\
    .withColumnRenamed('raceId', 'race_id')\
    .withColumnRenamed('driverId', 'driver_id')\
    .withColumnRenamed('constructorId', 'constructor_id')\
    .withColumnRenamed('positionText', 'position_text')\
    .withColumnRenamed('positionOrder', 'position_order')\
    .withColumnRenamed('fastestLap', 'fastest_lap')\
    .withColumnRenamed('fastestLapTime', 'fastest_lap_time')\
    .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed')

In [32]:
results_final_df = results_renamed_df\
    .withColumn('ingestion_date', current_timestamp())\
    .drop('statusId')

In [33]:
results_final_df\
    .write\
    .mode('overwrite')\
    .partitionBy('race_id')\
    .parquet('data/processed/results')

                                                                                

### Pitstops

In [34]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import current_timestamp

In [35]:
fields = [
    StructField('raceId', IntegerType(), False),
    StructField('driverId', IntegerType(), True),
    StructField('stop', IntegerType(), True),
    StructField('lap', IntegerType(), True),
    StructField('time', StringType(), True),
    StructField('duration', FloatType(), True),
    StructField('milliseconds', IntegerType(), True)
]

pitstops_schema = StructType(fields=fields)

In [36]:
pitstops_df = spark\
    .read\
    .option('header', True)\
    .schema(pitstops_schema)\
    .format('csv')\
    .load('data/pit_stops.csv')

In [37]:
pitstops_renamed_df = pitstops_df\
    .withColumnRenamed('raceId', 'race_id')\
    .withColumnRenamed('driverId', 'driver_id')

In [38]:
pitstops_final_df = pitstops_renamed_df.withColumn('ingestion_date', current_timestamp())

In [39]:
pitstops_final_df.write.mode('overwrite').parquet('data/processed/pitstops')

### Laptimes

In [40]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import current_timestamp

In [41]:
fields = [
    StructField('raceId', IntegerType(), False),
    StructField('driverId', IntegerType(), True),
    StructField('lap', IntegerType(), True),
    StructField('position', IntegerType(), True),
    StructField('time', StringType(), True),
    StructField('milliseconds', IntegerType(), True)
]

laptimes_schema = StructType(fields=fields)

In [42]:
laptimes_df = spark\
    .read\
    .option('header', True)\
    .schema(laptimes_schema)\
    .format('csv')\
    .load('data/lap_times.csv')

In [43]:
laptimes_renamed_df = laptimes_df\
    .withColumnRenamed('raceId', 'race_id')\
    .withColumnRenamed('driverId', 'driver_id') 

In [44]:
laptimes_final_df = laptimes_renamed_df.withColumn('ingestion_date', current_timestamp())

In [45]:
laptimes_final_df.write.mode('overwrite').parquet('data/processed/laptimes')

                                                                                

### Qualifying

In [46]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import current_timestamp, col, when

In [47]:
fields = [
    StructField('qualifyId', IntegerType(), False),
    StructField('raceId', IntegerType(), True),
    StructField('driverId', IntegerType(), True),
    StructField('constructorId', IntegerType(), True),
    StructField('number', IntegerType(), True),
    StructField('position', IntegerType(), True),
    StructField('q1', StringType(), True),
    StructField('q2', StringType(), True),
    StructField('q3', StringType(), True),
]

qualifying_schema = StructType(fields=fields)

In [48]:
qualifying_df = spark\
    .read\
    .option('header', True)\
    .schema(qualifying_schema)\
    .format('csv')\
    .load('data/qualifying.csv')

In [49]:
for column in qualifying_df.columns:
    qualifying_df = qualifying_df.withColumn(
        column,
        when(col(column) == '\\N', None).otherwise(col(column))
    )

In [50]:
qualifying_renamed_df = qualifying_df\
    .withColumnRenamed('qualifyingId', 'qualifying_id')\
    .withColumnRenamed('raceId', 'race_id')\
    .withColumnRenamed('driverId', 'driver_id')\
    .withColumnRenamed('constructorId', 'constructor_id')

In [51]:
qualifying_final_df = qualifying_renamed_df.withColumn('ingestion_date', current_timestamp())

In [52]:
qualifying_final_df.write.mode('overwrite').parquet('data/processed/qualifying')