In [0]:
circuits_df = spark.read.parquet('/mnt/formula1storageedu/processed/circuits/') \
        .withColumnRenamed('location', 'circuit_location')

constructors_df = spark.read.parquet('/mnt/formula1storageedu/processed/constructors/') \
        .withColumnRenamed('name', 'team')

drivers_df = spark.read.parquet('/mnt/formula1storageedu/processed/drivers/') \
        .withColumnRenamed('number', 'driver_number') \
        .withColumnRenamed('name', 'driver_name') \
        .withColumnRenamed('nationality', 'driver_nationality')

races_df = spark.read.parquet('/mnt/formula1storageedu/processed/races/') \
        .withColumnRenamed('name', 'race_name') \
        .withColumnRenamed('race_timestamp', 'race_date')

results_df = spark.read.parquet('/mnt/formula1storageedu/processed/results/') \
        .withColumnRenamed('time', 'race_time')

In [0]:
circuits_races_df = circuits_df.join(races_df, circuits_df.circuit_id == races_df.circuit_id, 'inner') \
        .select(races_df.race_id, races_df.race_year, races_df.race_name, races_df.race_date, circuits_df.circuit_location)

In [0]:
race_results_df = circuits_races_df.join(results_df, circuits_races_df.race_id == results_df.race_id, 'inner') \
                                   .join(drivers_df, results_df.driver_id == drivers_df.driver_id, 'inner') \
                                   .join(constructors_df, results_df.constructor_id == constructors_df.constructor_id, 'inner')

In [0]:
from pyspark.sql.functions import current_timestamp, from_utc_timestamp

dataviz_df = race_results_df.select('race_year', 
                                    'race_name', 
                                    'race_date', 
                                    'circuit_location', 
                                    'driver_name', 
                                    'driver_number', 
                                    'driver_nationality',
                                    'team',
                                    'grid',
                                    'fastest_lap',
                                    'race_time',
                                    'points') \
                            .withColumn('creation_date', from_utc_timestamp(current_timestamp(), 'America/Sao_Paulo'))

In [0]:
dataviz_df.dtypes

Out[5]: [('race_year', 'int'),
 ('race_name', 'string'),
 ('race_date', 'timestamp'),
 ('circuit_location', 'string'),
 ('driver_name', 'string'),
 ('driver_number', 'string'),
 ('driver_nationality', 'string'),
 ('team', 'string'),
 ('grid', 'int'),
 ('fastest_lap', 'int'),
 ('race_time', 'string'),
 ('points', 'float'),
 ('creation_date', 'timestamp')]

In [0]:
from pyspark.sql.functions import when

dataviz_df = dataviz_df.withColumn("race_time", when(dataviz_df.race_time == r"\N", None).otherwise(dataviz_df.race_time))

In [0]:
#This result is exactly the same as: https://www.bbc.com/sport/formula1/2022/abu-dhabi-grand-prix/results
display(dataviz_df.filter("race_year == 2022 and race_name == 'Abu Dhabi Grand Prix'").orderBy(dataviz_df.points.desc()).limit(5))

race_year,race_name,race_date,circuit_location,driver_name,driver_number,driver_nationality,team,grid,fastest_lap,race_time,points,creation_date
2022,Abu Dhabi Grand Prix,2022-11-20T13:00:00.000+0000,Abu Dhabi,Max Verstappen,33,Dutch,Red Bull,1,54,1:27:45.914,25.0,2023-05-09T08:32:48.976+0000
2022,Abu Dhabi Grand Prix,2022-11-20T13:00:00.000+0000,Abu Dhabi,Charles Leclerc,16,Monegasque,Ferrari,3,48,+8.771,18.0,2023-05-09T08:32:48.976+0000
2022,Abu Dhabi Grand Prix,2022-11-20T13:00:00.000+0000,Abu Dhabi,Sergio Pérez,11,Mexican,Red Bull,2,52,+10.093,15.0,2023-05-09T08:32:48.976+0000
2022,Abu Dhabi Grand Prix,2022-11-20T13:00:00.000+0000,Abu Dhabi,Carlos Sainz,55,Spanish,Ferrari,4,50,+24.892,12.0,2023-05-09T08:32:48.976+0000
2022,Abu Dhabi Grand Prix,2022-11-20T13:00:00.000+0000,Abu Dhabi,George Russell,63,British,Mercedes,6,48,+35.888,10.0,2023-05-09T08:32:48.976+0000


In [0]:
dataviz_df.write.mode('overwrite').parquet('/mnt/formula1storageedu/dataviz/race_results')